连续小波变化基本的积分公式如下:
W
(
a
,
b
)
=
∫
−
∞
+
∞
f
(
t
)
⋅
ψ
(
t
−
b
a
)
d
t
\Large W(a, b) = \int_{-\infin}^{+\infin}f(t)\cdot \psi(\frac{t-b}{a})\mathrm{d}t
W(a,b)=∫−∞+∞f(t)⋅ψ(at−b)dt
其中,
f
(
t
)
f(t)
f(t)为原始信号,
ψ
(
t
)
\psi(t)
ψ(t)为小波函数,这里以morl小波函数为示例,其具体的表达式和函数图像如下:
ψ
m
o
r
l
(
t
)
=
e
−
t
2
2
⋅
c
o
s
(
5
t
)
\Large \psi_{\mathrm{morl}}(t)=e^{-\frac{t^2}{2}}\cdot \mathrm{cos}(5t)
ψmorl(t)=e−2t2⋅cos(5t)
小波变换的基本含义是,以小波作为一个度量的工具,将小波函数平移拉伸,再与原始信号相乘,从表达式不难看出,当小波信号与原始信号的位置频率吻合较好时,即波峰对应波峰,波谷也对应波谷,这样积分值是比较大的,所以得到的积分值可以作为一个原始信号和小波信号相关性的度量值。
现在要在计算机中编程实现该小波变换,首先要考虑的是,计算机中不能处理无限的数据,所以上述积分必须在一个有限的积分区间上进行,其次,计算机中的数据都是离散表示的,所以上述积分需要离散化,要用数值积分代替解析积分。
考虑一段实际信号
f
(
t
)
f(t)
f(t)定义在一段时间上,即
t
∈
[
0
,
T
]
t\in[0, T]
t∈[0,T],则积分也再改区间上进行。因为小波有着固定的表达式,为了计算积分方便可考虑一步换元,
t
−
b
a
=
x
\frac{t-b}{a}=x
at−b=x,这样小波变换表达式如下:
W
(
a
,
b
)
=
a
∫
−
b
a
T
−
b
a
f
(
a
x
+
b
)
⋅
ψ
(
x
)
d
x
\Large W(a, b) = a\int_{-\frac{b}{a}}^{\frac{T-b}{a}}f(ax+b)\cdot \psi(x)\mathrm{d}x
W(a,b)=a∫−abaT−bf(ax+b)⋅ψ(x)dx
复化梯形积分公式如下,他是一种数值积分方法,需要将积分区间均等分为
n
n
n份,下式中步长
h
=
(
x
n
−
x
0
)
/
n
h=(x_n - x_0)/n
h=(xn−x0)/n
∫
x
0
x
n
f
(
x
)
d
x
=
h
⋅
{
∑
i
=
0
n
f
(
x
i
)
−
1
2
[
f
(
x
0
)
+
f
(
x
n
)
]
}
\int_{x_0}^{x_n}f(x)\mathrm{d}x=h\cdot\{\sum_{i=0}^{n}f(x_i)-\frac{1}{2}[f(x_0)+f(x_n)]\}
∫x0xnf(x)dx=h⋅{i=0∑nf(xi)−21[f(x0)+f(xn)]}
绘图表示出相应的积分节点,如下图所示
红色虚线为原始信号,蓝色带点标记为小波信号,注意到小波信号在一段范围之外始终是0,对应相乘的积分值也是0,可以忽略不计算,同时上面积分节点的离散方式是全域离散,即积分区间始终是 [ 0 , T ] [0, T] [0,T],是将该区间均等划分后积分的,随着尺度参数变化,小波被拉长缩短,分布在小波有波动段的节点数也不同,这样积分的精度都无法控制了。
为了解决上述问题,首先就是要在积分区间的某个子区间上积分,也就是所谓的“加窗”,只在窗内计算积分。这里我们仅做简单的加窗处理,注意到小波的原始表达式中,在
[
−
5
,
5
]
[-5,5]
[−5,5]这个区间外基本都是0,所以可以只离散这个区间,即
x
i
∈
[
−
5
,
5
]
x_i\in[-5,5]
xi∈[−5,5],原积分的计算公式就为
W
(
a
,
b
)
=
a
⋅
h
⋅
{
∑
i
=
0
n
[
f
(
a
x
i
+
b
)
⋅
ψ
(
x
i
)
]
−
1
2
[
f
(
a
x
0
+
b
)
ψ
(
x
0
)
+
f
(
a
x
n
+
b
)
ψ
(
x
n
)
]
}
\Large W(a, b) = a\cdot h\cdot\{\sum_{i=0}^{n}[f(ax_i+b)\cdot\psi(x_i)]-\frac{1}{2}[f(ax_0+b)\psi(x_0)+f(ax_n+b)\psi(x_n)]\}
W(a,b)=a⋅h⋅{i=0∑n[f(axi+b)⋅ψ(xi)]−21[f(ax0+b)ψ(x0)+f(axn+b)ψ(xn)]}
之后对应加窗后的计算图像如下图,红色数据点为实际需要计算的内容,红色虚线为小波函数。
需要注意的是 f ( a x i + b ) f(ax_i+b) f(axi+b)可能是未定义的,因为自变量可能超出定义域,也可能不在 f ( t ) f(t) f(t)的数据点上,所以需要对原始数据的未知未知进行插值,定义域外作外插或者填充。最简单的填充就是未定义的数据都为0,或者直接不考虑,只考虑小波和原始信号的公共区间。之后给定相应的尺度参数 a a a和平移参数 b b b就可以得到对应的相关系数了。
针对一系列的尺度参数和平移参数,可以表示为一幅图像,如下图所示。
同参数变换下,我的算法(左图)和pywavelet小波变换库下的pywt.cwt()
函数(右图)做一个对比,如下图所示
看起来似乎我的算法生成的图质量要更高一些,但作为库函数要考虑的点多,包括输入输出的健壮性,算法的速度等,做的优化比较足,可能是在计算精度上有所取舍了,后续测试了一下,库函数计算的效率是比我这自己的算法快了有好几百倍。
不过我的算法里用的是至少50个积分节点的复化梯形积分(低频的小波可能会更多),自然算得慢一些。
源代码如下,大概加了一些注释,不同输入数据的scale尺度参数可能需要好好调一调,调参的时候sampling_period采样间隔最好设置为1,不同于pywt库函数里的连续小波变换函数cw(),我这里的采样间隔会对变换结果产生影响,需要设置为两个数据点之间真实的物理时间。当然你可以假设这个时间就是1秒,只是我感觉直接把采样间隔传递给变换函数比较方便,这样直接返回真实的物理频率了,不用像库函数那样还要再换算一遍。
当你这里采样间隔sampling_period设置为1秒的时候,这适合用于寻找合适的scales参数时,那就跟pywt库函数是一个逻辑了,返回的频率就不是真实的物理频率,你还要再换算一次。此时小波函数固定是10个长度的数组(就是psi_T的值)。scale_mode设置为’fix_freq’时,尺度参数为1代表物理频率恒定为0.8125Hz的morl小波;scale_mode设置为’same_with_data’时,尺度参数为1代表morl小波数组的长度与输入信号的长度一致,这在寻找合适的scale参数时很有用。
Display注释后面的代码可以随时取消注释,在合适的ide下你可以看到小波函数与原函数相乘的每一帧,以便你发现调试错误。
# -*- coding: utf-8 -*-
"""
Created on Sat Oct 5 17:19:45 2024
@author: YunHao Liu
"""
import pywt
import math
import matplotlib.pyplot as plt
import numpy as np
# import function
def getExampleSignal(num=50):
t = np.linspace(0, 1, num)
y = t * np.exp(-10 * t) * np.sin(50 * t) * 100.0
return t, y
def _interp(y, x):
output = 0.0
low = math.floor(x)
up = math.ceil(x)
if x > 0.0 and x < len(y) - 1.0 :
if low == up:
output = y[low]
else:
output = y[low] * (up-x) + y[up] * (x-low)
return output
def cwt(y, scales, sampling_period=1.0, scale_mode='fix_freq', take_abs=True):
global psi_T # period of the wavelet
global calcu_num_at_least # amounts of intergration points
if type(scales) is not np.ndarray:
scales = np.array(scales, dtype='float')
# Define output variable
target_size = (len(scales), y.shape[-1])
freq_size = (len(scales), )
coef = np.empty(target_size, dtype='float')
freqs = np.empty(freq_size, dtype='float')
# Define morl wavelet
psi = lambda t: np.exp(-0.5 * (t ** 2)) * np.cos(5.0 * t)
if scale_mode == 'fix_freq':
# psi_T * phy_scales = psi_T * sampling_period * seq_scales = real physical wavelet period
phy_scales = scales.copy()
seq_scales = phy_scales / sampling_period
freqs[:] = 0.8125 / phy_scales
if scale_mode == 'same_with_data':
# psi_T * phy_scales = psi_T * sampling_period * seq_scales = real physical wavelet period
seq_scales = scales.copy() * (len(y) / psi_T)
phy_scales = sampling_period * seq_scales
freqs[:] = 0.8125 / phy_scales
# loop in scales
for i_sa, seq_scale in enumerate(seq_scales):
# intergration points
calcu_num = max(min(int(psi_T*seq_scale), len(y)), calcu_num_at_least)
# print(f'intergration points amounts is {calcu_num} while scale {scale}')
# loop in bias
for bias in range(len(y)):
# generate the calculation points
calcu_points = np.linspace(-0.5*psi_T, 0.5*psi_T, calcu_num)
global_time = calcu_points * seq_scale + bias
psi_calcu = psi(calcu_points)
y_calcu = np.array([_interp(y, x) for x in global_time])
# range cut
index_upper = global_time >= 0.0
index_lower = global_time <= len(y)
range_cut = index_upper & index_lower
global_time = global_time[range_cut]
psi_calcu = psi_calcu[range_cut]
y_calcu = y_calcu[range_cut]
intergration = np.sum(y_calcu * psi_calcu) - 0.5 * y_calcu[0] * psi_calcu[0] - 0.5 * y_calcu[-1] * psi_calcu[-1]
intergration *= seq_scale * (10.0/calcu_num)
if take_abs:
coef[i_sa, bias] = abs(intergration)
else:
coef[i_sa, bias] = intergration
# # Display # Cancel Comments while Debuging
# plt.figure()
# plt.plot(np.arange(len(y))*sampling_period, y, 'b-')
# plt.plot(global_time*sampling_period, y_calcu, 'ro', markersize=2)
# plt.plot(global_time*sampling_period, psi_calcu, 'r--')
# plt.xlim([0.0, len(y)*sampling_period])
# plt.show()
# break
# break
return coef, freqs
if __name__ == "__main__":
psi_T = 10.0 # period of the wavelet
calcu_num_at_least = 50 # amounts of intergration points
t, y = getExampleSignal(num=100)
coef, freq = cwt(y, 0.8125/np.linspace(5, 20, 100), sampling_period=0.01, scale_mode='fix_freq')
cwtImg, cwtFreq = pywt.cwt(y, 0.8125/np.linspace(5, 20, 100)*100, wavelet='morl', sampling_period=0.01)
# print(f'frequency is {freq} Hz')
print(f'max coef is {coef.max()}')
print(f'most similar frequency: {freq[coef.sum(axis=-1).argmax()]:2.5f} Hz')
fig, axs = plt.subplots(1, 2, figsize=(6, 3), dpi=300)
ax1, ax2 = axs
ax1.imshow(coef)
ax1.set_title('my cwt')
ax2.imshow(np.abs(cwtImg))
ax2.set_title('pywt cwt')
plt.show()