连续小波变换Python编程实现

连续小波变化基本的积分公式如下:
W ( a , b ) = ∫ − ∞ + ∞ f ( t ) ⋅ ψ ( t − b a ) d t \Large W(a, b) = \int_{-\infin}^{+\infin}f(t)\cdot \psi(\frac{t-b}{a})\mathrm{d}t W(a,b)=+f(t)ψ(atb)dt
其中,

f ( t ) f(t) f(t)为原始信号, ψ ( t ) \psi(t) ψ(t)为小波函数,这里以morl小波函数为示例,其具体的表达式和函数图像如下:
ψ m o r l ( t ) = e − t 2 2 ⋅ c o s ( 5 t ) \Large \psi_{\mathrm{morl}}(t)=e^{-\frac{t^2}{2}}\cdot \mathrm{cos}(5t) ψmorl(t)=e2t2cos(5t)

在这里插入图片描述

小波变换的基本含义是,以小波作为一个度量的工具,将小波函数平移拉伸,再与原始信号相乘,从表达式不难看出,当小波信号与原始信号的位置频率吻合较好时,即波峰对应波峰,波谷也对应波谷,这样积分值是比较大的,所以得到的积分值可以作为一个原始信号和小波信号相关性的度量值。

现在要在计算机中编程实现该小波变换,首先要考虑的是,计算机中不能处理无限的数据,所以上述积分必须在一个有限的积分区间上进行,其次,计算机中的数据都是离散表示的,所以上述积分需要离散化,要用数值积分代替解析积分。

考虑一段实际信号 f ( t ) f(t) f(t)定义在一段时间上,即 t ∈ [ 0 , T ] t\in[0, T] t[0,T],则积分也再改区间上进行。因为小波有着固定的表达式,为了计算积分方便可考虑一步换元, t − b a = x \frac{t-b}{a}=x atb=x,这样小波变换表达式如下:
W ( a , b ) = a ∫ − b a T − b a f ( a x + b ) ⋅ ψ ( x ) d x \Large W(a, b) = a\int_{-\frac{b}{a}}^{\frac{T-b}{a}}f(ax+b)\cdot \psi(x)\mathrm{d}x W(a,b)=aabaTbf(ax+b)ψ(x)dx
复化梯形积分公式如下,他是一种数值积分方法,需要将积分区间均等分为 n n n份,下式中步长 h = ( x n − x 0 ) / n h=(x_n - x_0)/n h=(xnx0)/n
∫ x 0 x n f ( x ) d x = h ⋅ { ∑ i = 0 n f ( x i ) − 1 2 [ f ( x 0 ) + f ( x n ) ] } \int_{x_0}^{x_n}f(x)\mathrm{d}x=h\cdot\{\sum_{i=0}^{n}f(x_i)-\frac{1}{2}[f(x_0)+f(x_n)]\} x0xnf(x)dx=h{i=0nf(xi)21[f(x0)+f(xn)]}
绘图表示出相应的积分节点,如下图所示

在这里插入图片描述

红色虚线为原始信号,蓝色带点标记为小波信号,注意到小波信号在一段范围之外始终是0,对应相乘的积分值也是0,可以忽略不计算,同时上面积分节点的离散方式是全域离散,即积分区间始终是 [ 0 , T ] [0, T] [0,T],是将该区间均等划分后积分的,随着尺度参数变化,小波被拉长缩短,分布在小波有波动段的节点数也不同,这样积分的精度都无法控制了。

为了解决上述问题,首先就是要在积分区间的某个子区间上积分,也就是所谓的“加窗”,只在窗内计算积分。这里我们仅做简单的加窗处理,注意到小波的原始表达式中,在 [ − 5 , 5 ] [-5,5] [5,5]这个区间外基本都是0,所以可以只离散这个区间,即 x i ∈ [ − 5 , 5 ] x_i\in[-5,5] xi[5,5],原积分的计算公式就为
W ( a , b ) = a ⋅ h ⋅ { ∑ i = 0 n [ f ( a x i + b ) ⋅ ψ ( x i ) ] − 1 2 [ f ( a x 0 + b ) ψ ( x 0 ) + f ( a x n + b ) ψ ( x n ) ] } \Large W(a, b) = a\cdot h\cdot\{\sum_{i=0}^{n}[f(ax_i+b)\cdot\psi(x_i)]-\frac{1}{2}[f(ax_0+b)\psi(x_0)+f(ax_n+b)\psi(x_n)]\} W(a,b)=ah{i=0n[f(axi+b)ψ(xi)]21[f(ax0+b)ψ(x0)+f(axn+b)ψ(xn)]}
之后对应加窗后的计算图像如下图,红色数据点为实际需要计算的内容,红色虚线为小波函数。
在这里插入图片描述

需要注意的是 f ( a x i + b ) f(ax_i+b) f(axi+b)可能是未定义的,因为自变量可能超出定义域,也可能不在 f ( t ) f(t) f(t)的数据点上,所以需要对原始数据的未知未知进行插值,定义域外作外插或者填充。最简单的填充就是未定义的数据都为0,或者直接不考虑,只考虑小波和原始信号的公共区间。之后给定相应的尺度参数 a a a和平移参数 b b b就可以得到对应的相关系数了。

针对一系列的尺度参数和平移参数,可以表示为一幅图像,如下图所示。

在这里插入图片描述

同参数变换下,我的算法(左图)和pywavelet小波变换库下的pywt.cwt()函数(右图)做一个对比,如下图所示

在这里插入图片描述

看起来似乎我的算法生成的图质量要更高一些,但作为库函数要考虑的点多,包括输入输出的健壮性,算法的速度等,做的优化比较足,可能是在计算精度上有所取舍了,后续测试了一下,库函数计算的效率是比我这自己的算法快了有好几百倍。

在这里插入图片描述

不过我的算法里用的是至少50个积分节点的复化梯形积分(低频的小波可能会更多),自然算得慢一些。

源代码如下,大概加了一些注释,不同输入数据的scale尺度参数可能需要好好调一调,调参的时候sampling_period采样间隔最好设置为1,不同于pywt库函数里的连续小波变换函数cw(),我这里的采样间隔会对变换结果产生影响,需要设置为两个数据点之间真实的物理时间。当然你可以假设这个时间就是1秒,只是我感觉直接把采样间隔传递给变换函数比较方便,这样直接返回真实的物理频率了,不用像库函数那样还要再换算一遍。
当你这里采样间隔sampling_period设置为1秒的时候,这适合用于寻找合适的scales参数时,那就跟pywt库函数是一个逻辑了,返回的频率就不是真实的物理频率,你还要再换算一次。此时小波函数固定是10个长度的数组(就是psi_T的值)。scale_mode设置为’fix_freq’时,尺度参数为1代表物理频率恒定为0.8125Hz的morl小波;scale_mode设置为’same_with_data’时,尺度参数为1代表morl小波数组的长度与输入信号的长度一致,这在寻找合适的scale参数时很有用。
Display注释后面的代码可以随时取消注释,在合适的ide下你可以看到小波函数与原函数相乘的每一帧,以便你发现调试错误。

# -*- coding: utf-8 -*-
"""
Created on Sat Oct  5 17:19:45 2024

@author: YunHao Liu
"""
import pywt
import math
import matplotlib.pyplot as plt
import numpy as np
# import function


def getExampleSignal(num=50):
    t = np.linspace(0, 1, num)
    y = t * np.exp(-10 * t) * np.sin(50 * t) * 100.0

    return t, y


def _interp(y, x):
    output = 0.0
    low = math.floor(x)
    up = math.ceil(x)
    if x > 0.0 and x < len(y) - 1.0 :
        if low == up:
            output = y[low]
        else:
            output = y[low] * (up-x) + y[up] * (x-low)
            
    return output


def cwt(y, scales, sampling_period=1.0, scale_mode='fix_freq', take_abs=True):
    global psi_T  # period of the wavelet
    global calcu_num_at_least  # amounts of intergration points
    
    if type(scales) is not np.ndarray:
        scales = np.array(scales, dtype='float')
    # Define output variable
    target_size = (len(scales), y.shape[-1])
    freq_size = (len(scales), )
    coef = np.empty(target_size, dtype='float')
    freqs = np.empty(freq_size, dtype='float')
    # Define morl wavelet
    psi = lambda t: np.exp(-0.5 * (t ** 2)) * np.cos(5.0 * t)
    
    if scale_mode == 'fix_freq':
        # psi_T * phy_scales = psi_T * sampling_period * seq_scales = real physical wavelet period
        phy_scales = scales.copy()
        seq_scales = phy_scales / sampling_period
        freqs[:] = 0.8125 / phy_scales
    if scale_mode == 'same_with_data':
        # psi_T * phy_scales = psi_T * sampling_period * seq_scales = real physical wavelet period
        seq_scales = scales.copy() * (len(y) / psi_T)
        phy_scales = sampling_period * seq_scales
        freqs[:] = 0.8125 / phy_scales
    # loop in scales
    for i_sa, seq_scale in enumerate(seq_scales):
        # intergration points
        calcu_num = max(min(int(psi_T*seq_scale), len(y)), calcu_num_at_least)
        # print(f'intergration points amounts is {calcu_num} while scale {scale}')
        # loop in bias
        for bias in range(len(y)):
            # generate the calculation points
            calcu_points = np.linspace(-0.5*psi_T, 0.5*psi_T, calcu_num)
            global_time = calcu_points * seq_scale + bias
            
            psi_calcu = psi(calcu_points)
            y_calcu = np.array([_interp(y, x) for x in global_time])
            
            # range cut
            index_upper = global_time >= 0.0
            index_lower = global_time <= len(y)
            range_cut = index_upper & index_lower
            global_time = global_time[range_cut]
            psi_calcu = psi_calcu[range_cut]
            y_calcu = y_calcu[range_cut]
            
            intergration = np.sum(y_calcu * psi_calcu) - 0.5 * y_calcu[0] * psi_calcu[0] - 0.5 * y_calcu[-1] * psi_calcu[-1]
            intergration *= seq_scale * (10.0/calcu_num)
            
            if take_abs:
                coef[i_sa, bias] = abs(intergration)
            else:
                coef[i_sa, bias] = intergration

            # # Display  # Cancel Comments while Debuging
            # plt.figure()
            # plt.plot(np.arange(len(y))*sampling_period, y, 'b-')
            # plt.plot(global_time*sampling_period, y_calcu, 'ro', markersize=2)
            # plt.plot(global_time*sampling_period, psi_calcu, 'r--')
            # plt.xlim([0.0, len(y)*sampling_period])
            # plt.show()
            
        #     break
        # break
            
    return coef, freqs


        
if __name__ == "__main__":
    psi_T = 10.0  # period of the wavelet
    calcu_num_at_least = 50  # amounts of intergration points
    
    t, y = getExampleSignal(num=100)
    
    coef, freq = cwt(y, 0.8125/np.linspace(5, 20, 100), sampling_period=0.01, scale_mode='fix_freq')
    
    cwtImg, cwtFreq = pywt.cwt(y, 0.8125/np.linspace(5, 20, 100)*100, wavelet='morl', sampling_period=0.01)
    
    # print(f'frequency is {freq} Hz')
    print(f'max coef is {coef.max()}')
    print(f'most similar frequency: {freq[coef.sum(axis=-1).argmax()]:2.5f} Hz')
    
    fig, axs = plt.subplots(1, 2, figsize=(6, 3), dpi=300)
    ax1, ax2 = axs
    
    ax1.imshow(coef)
    ax1.set_title('my cwt')

    ax2.imshow(np.abs(cwtImg))
    ax2.set_title('pywt cwt')
    
    plt.show()
    
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值