简介

动态时间规整:(Dynamic Time Warping,DTW)

定义:用于比较不同长度的两个数组或时间序列之间的相似性计算两者间的距离。

例1:a =[1,2,3],b=[3,2,2]

例2:a=[1,2,3],b=[2,2,2,3,4]

例1好计算,但对于例2,如何计算呢?即所谓的规整或扭曲。

比较不同长度的数组的思想是构建一对多多对一匹配,以便使两者之间的总距离最小化

DTW是计算给定两个序列最优匹配的一种方法,满足以下规则:

  • 第一个序列的每个索引必须与另一序列的一个或多个索引匹配,反之亦然

  • 第一个序列的第一个索引必须匹配另一个序列的第一个索引(也可匹配多个索引)

  • 第一个序列索引对另一个序列索引的映射必须是单调递增的,反之亦然。如果第一个序列中索引 ,另一个序列中索引 ,则 匹配 匹配 ( 两者连续且相差不大于1)。如下图所示:

最优匹配是指满足上述条件代价最小的匹配表示,代价计算方法是计算每个匹配索引对的值之间的绝对差的和

总结一下,头和尾必须在位置上匹配,不能交叉匹配,也不能遗漏。

DTW中warping对应于在时间维度上的非线性序列,即所谓的“warping path”。通过时间对齐解决。常用于基因序列和音频同步。

用一个 矩阵 表示序列 和序列 各个点之间的距离, 等于 的第 个点和 的第 个点之间的距离,即:

传统欧几里得距离里对应的点:

  • A(1)-B(1)

  • A(2)-B(2)

  • A(3)-B(3)

  • A(4)-B(4)

  • A(5)-B(5)

  • A(6)-B(6)

它们正好构成了对角线,对角线上元素和为6

时间规整的方法里,对应的点为:

  • A(1)-B(1)

  • A(2)-B(1)

  • A(3)B(2)

  • A(4)-B(2)

  • A(5)-B(4)

  • A(5)-B(4)

  • A(6)-B(5)

  • A(6)-B(6)

这些点构成了从左上角到右下角的另一条路径,路径上的元素和为0

因此,DTW算法的步骤为:

  1. 计算两个序列各个点之间的距离矩阵。

  2. 寻找一条从矩阵左上角到右下角的路径,使得路径上的元素和最小。

我们称路径上的元素和为路径长度。那么如何寻找长度最小的路径呢?

矩阵从左上角到右下角的路径长度有以下性质:

  1. 当前路径长度 = 前一步的路径长度 + 当前元素的大小

  2. 路径上的某个元素 ,它的前一个元素只可能为以下三者之一:

a) 左边的相邻元素 (i, j-1) 

b) 上面的相邻元素 (i-1, j) 

c) 左上方的相邻元素 (i-1, j-1)

假设矩阵为M,从矩阵左上角 到任一点 的最短路径长度为 。那么可以用递归算法求最短路径长度:

起始条件:

递推规则:

递推规则这样写的原因是因为当前元素的最短路径必然是从前一个元素的最短路径的长度加上当前元素的值。前一个元素有三个可能,我们取三个可能之中路径最短的那个即可。

python实现

import numpy as np


def dtw(s, t):
    """
    :param s: 第一个序列
    :param t: 第二个序列
    :return:
    """
    n, m = len(s), len(t)

    # 构建n行m列矩阵
    dtw_matrix = np.zeros((n + 1, m + 1))
    dtw_matrix.fill(np.inf)
    dtw_matrix[0, 0] = 0  # 路径左上-->右下

    # dtw_matirx[i, j] is the distance between s[1:i] and t[1:j] with the best alignment.

    for i in range(1, n + 1):
        for j in range(1, m + 1):
            cost = np.abs(s[i - 1] - t[j - 1])  # 计算绝对距离
            # 找到当前索引方块的top,left,top_left方向的累积损失的最小值
            last_min = np.min([dtw_matrix[i, j - 1], dtw_matrix[i - 1, j], dtw_matrix[i - 1, j - 1]])
            dtw_matrix[i, j] = cost + last_min
    return dtw_matrix


a = [1, 2, 3]
b = [2, 2, 2, 3, 4]

dtw(a, b)

结果如下:

array([[ 0., inf, inf, inf, inf, inf],
       [inf,  1.,  2.,  3.,  5.,  8.],
       [inf,  1.,  1.,  1.,  2.,  4.],
       [inf,  2.,  2.,  2.,  1.,  2.]])

a和b之间的距离是矩阵的最后一个元素,也就是2。

添加窗口约束

上述算法的一个问题是:我们允许一个数组中的一个元素与另一个数组中的无限个元素匹配(只要尾部最终能够匹配),这将导致映射大量弯曲,例如,下面的数组

a = [1, 2, 3]
b = [1, 2, 2, 2, 2, 2, 2, 2, ..., 5]

为了最小化距离,数组a中的元素2将匹配数组b中的所有2,这导致数组b严重弯曲。

为了避免这种情况,我们可以添加一个窗口约束来限制可以匹配的元素的数量。

关键的区别是,现在每个元素都被限制在匹配范围 中的元素。保证所有索引都可以匹配:

def dtw(s, t, window):
    n, m = len(s), len(t)
    w = np.min([window, abs(n - m)])
    dtw_matrix = np.full((n + 1, m + 1), np.inf)
    dtw_matrix[0, 0] = 0

    for i in range(1, n + 1):
        for j in range(np.max([1, i - w]), np.min([i + w, m]) + 1):
            # 当前损失
            cost = np.abs(s[i - 1] - t[j - 1])

            # 上一个最小值
            last_min = np.min([dtw_matrix[i, j - 1], dtw_matrix[i - 1, j], dtw_matrix[i - 1, j - 1]])

            # 当前累积损失
            dtw_matrix[i, j] = last_min + cost
    return dtw_matrix


a = [1, 2, 3, 3, 5]
b = [1, 2, 2, 2, 2, 2, 2, 4]

dtw(a, b, window=3)

结果如下:

array([[ 0., inf, inf, inf, inf, inf, inf, inf, inf],
       [inf,  0.,  1.,  2.,  3., inf, inf, inf, inf],
       [inf,  1.,  0.,  0.,  0.,  0., inf, inf, inf],
       [inf,  3.,  1.,  1.,  1.,  1.,  1., inf, inf],
       [inf,  5.,  2.,  2.,  2.,  2.,  2.,  2., inf],
       [inf, inf,  5.,  5.,  5.,  5.,  5.,  5.,  3.]])

fastdtw 包

from fastdtw import fastdtw
from scipy.spatial.distance import euclidean

x = np.array([1, 2, 3, 3, 7])
y = np.array([1, 2, 2, 2, 2, 2, 2, 4])

distance, path = fastdtw(x, y, dist=euclidean)

print(distance)
print(path)

结果如下:

5.0
[(0, 0), (1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (3, 6), (4, 7)]

应用于多元变量

并不要求两个时间序列具有相同的长度,但它们必须具有相同大小的维度。

def get_squared_dist(x, y):
    """计算欧式距离"""
    dist = 0.
    for di in range(x.shape[0]):
        diff = (x[di] - y[di])
        dist += diff * diff
    return dist


def accumulated_matrix(s1, s2):
    l1 = s1.shape[0]
    l2 = s2.shape[0]
    cum_sum = np.full((l1 + 1, l2 + 1), np.inf)
    cum_sum[0, 0] = 0.

    for i in range(l1):
        for j in range(l2):
            cum_sum[i + 1, j + 1] = get_squared_dist(s1[i], s2[j])
            cum_sum[i + 1, j + 1] += min(cum_sum[i, j + 1],
                                         cum_sum[i + 1, j],
                                         cum_sum[i, j])
    return cum_sum[1:, 1:]


s1 = np.array([[1, 2], [3, 4]])
s2 = np.array([[1, 5], [6, 7]])

accumulated_matrix(s1, s2)

结果如下:

array([[ 9., 59.],
       [14., 27.]])

计算方法如下:

参考:

[1] https://en.wikipedia.org/wiki/Dynamic_time_warping

[2] https://towardsdatascience.com/dynamic-time-warping-3933f25fcdd

[3] https://zhuanlan.zhihu.com/p/43247215

建议阅读:

高考失利之后,属于我的大学本科四年

【资源分享】对于时间序列,你所能做的一切.

【时空序列预测第一篇】什么是时空序列问题?这类问题主要应用了哪些模型?主要应用在哪些领域?

【AI蜗牛车出品】手把手AI项目、时空序列、时间序列、白话机器学习、pytorch修炼

公众号:AI蜗牛车

保持谦逊、保持自律、保持进步

个人微信

备注:昵称+学校/公司+方向

如果没有备注不拉群!

拉你进AI蜗牛车交流群

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐