Spaces:
Runtime error
Runtime error
from numpy import array, zeros, full, argmin, inf, ndim | |
from scipy.spatial.distance import cdist | |
from math import isinf | |
def dtw(x, y, dist, warp=1, w=inf, s=1.0): | |
""" | |
Computes Dynamic Time Warping (DTW) of two sequences. | |
:param array x: N1*M array | |
:param array y: N2*M array | |
:param func dist: distance used as cost measure | |
:param int warp: how many shifts are computed. | |
:param int w: window size limiting the maximal distance between indices of matched entries |i,j|. | |
:param float s: weight applied on off-diagonal moves of the path. As s gets larger, the warping path is increasingly biased towards the diagonal | |
Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path. | |
""" | |
assert len(x) | |
assert len(y) | |
assert isinf(w) or (w >= abs(len(x) - len(y))) | |
assert s > 0 | |
r, c = len(x), len(y) | |
if not isinf(w): | |
D0 = full((r + 1, c + 1), inf) | |
for i in range(1, r + 1): | |
D0[i, max(1, i - w):min(c + 1, i + w + 1)] = 0 | |
D0[0, 0] = 0 | |
else: | |
D0 = zeros((r + 1, c + 1)) | |
D0[0, 1:] = inf | |
D0[1:, 0] = inf | |
D1 = D0[1:, 1:] # view | |
for i in range(r): | |
for j in range(c): | |
if (isinf(w) or (max(0, i - w) <= j <= min(c, i + w))): | |
D1[i, j] = dist(x[i], y[j]) | |
C = D1.copy() | |
jrange = range(c) | |
for i in range(r): | |
if not isinf(w): | |
jrange = range(max(0, i - w), min(c, i + w + 1)) | |
for j in jrange: | |
min_list = [D0[i, j]] | |
for k in range(1, warp + 1): | |
i_k = min(i + k, r) | |
j_k = min(j + k, c) | |
min_list += [D0[i_k, j] * s, D0[i, j_k] * s] | |
D1[i, j] += min(min_list) | |
if len(x) == 1: | |
path = zeros(len(y)), range(len(y)) | |
elif len(y) == 1: | |
path = range(len(x)), zeros(len(x)) | |
else: | |
path = _traceback(D0) | |
return D1[-1, -1], C, D1, path | |
def accelerated_dtw(x, y, dist, warp=1): | |
""" | |
Computes Dynamic Time Warping (DTW) of two sequences in a faster way. | |
Instead of iterating through each element and calculating each distance, | |
this uses the cdist function from scipy (https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html) | |
:param array x: N1*M array | |
:param array y: N2*M array | |
:param string or func dist: distance parameter for cdist. When string is given, cdist uses optimized functions for the distance metrics. | |
If a string is passed, the distance function can be 'braycurtis', 'canberra', 'chebyshev', 'cityblock', 'correlation', 'cosine', 'dice', 'euclidean', 'hamming', 'jaccard', 'kulsinski', 'mahalanobis', 'matching', 'minkowski', 'rogerstanimoto', 'russellrao', 'seuclidean', 'sokalmichener', 'sokalsneath', 'sqeuclidean', 'wminkowski', 'yule'. | |
:param int warp: how many shifts are computed. | |
Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path. | |
""" | |
assert len(x) | |
assert len(y) | |
if ndim(x) == 1: | |
x = x.reshape(-1, 1) | |
if ndim(y) == 1: | |
y = y.reshape(-1, 1) | |
r, c = len(x), len(y) | |
D0 = zeros((r + 1, c + 1)) | |
D0[0, 1:] = inf | |
D0[1:, 0] = inf | |
D1 = D0[1:, 1:] | |
D0[1:, 1:] = cdist(x, y, dist) | |
C = D1.copy() | |
for i in range(r): | |
for j in range(c): | |
min_list = [D0[i, j]] | |
for k in range(1, warp + 1): | |
min_list += [D0[min(i + k, r), j], | |
D0[i, min(j + k, c)]] | |
D1[i, j] += min(min_list) | |
if len(x) == 1: | |
path = zeros(len(y)), range(len(y)) | |
elif len(y) == 1: | |
path = range(len(x)), zeros(len(x)) | |
else: | |
path = _traceback(D0) | |
return D1[-1, -1], C, D1, path | |
def _traceback(D): | |
i, j = array(D.shape) - 2 | |
p, q = [i], [j] | |
while (i > 0) or (j > 0): | |
tb = argmin((D[i, j], D[i, j + 1], D[i + 1, j])) | |
if tb == 0: | |
i -= 1 | |
j -= 1 | |
elif tb == 1: | |
i -= 1 | |
else: # (tb == 2): | |
j -= 1 | |
p.insert(0, i) | |
q.insert(0, j) | |
return array(p), array(q) | |
if __name__ == '__main__': | |
w = inf | |
s = 1.0 | |
if 1: # 1-D numeric | |
from sklearn.metrics.pairwise import manhattan_distances | |
x = [0, 0, 1, 1, 2, 4, 2, 1, 2, 0] | |
y = [1, 1, 1, 2, 2, 2, 2, 3, 2, 0] | |
dist_fun = manhattan_distances | |
w = 1 | |
# s = 1.2 | |
elif 0: # 2-D numeric | |
from sklearn.metrics.pairwise import euclidean_distances | |
x = [[0, 0], [0, 1], [1, 1], [1, 2], [2, 2], [4, 3], [2, 3], [1, 1], [2, 2], [0, 1]] | |
y = [[1, 0], [1, 1], [1, 1], [2, 1], [4, 3], [4, 3], [2, 3], [3, 1], [1, 2], [1, 0]] | |
dist_fun = euclidean_distances | |
else: # 1-D list of strings | |
from nltk.metrics.distance import edit_distance | |
# x = ['we', 'shelled', 'clams', 'for', 'the', 'chowder'] | |
# y = ['class', 'too'] | |
x = ['i', 'soon', 'found', 'myself', 'muttering', 'to', 'the', 'walls'] | |
y = ['see', 'drown', 'himself'] | |
# x = 'we talked about the situation'.split() | |
# y = 'we talked about the situation'.split() | |
dist_fun = edit_distance | |
dist, cost, acc, path = dtw(x, y, dist_fun, w=w, s=s) | |
# Vizualize | |
from matplotlib import pyplot as plt | |
plt.imshow(cost.T, origin='lower', cmap=plt.cm.Reds, interpolation='nearest') | |
plt.plot(path[0], path[1], '-o') # relation | |
plt.xticks(range(len(x)), x) | |
plt.yticks(range(len(y)), y) | |
plt.xlabel('x') | |
plt.ylabel('y') | |
plt.axis('tight') | |
if isinf(w): | |
plt.title('Minimum distance: {}, slope weight: {}'.format(dist, s)) | |
else: | |
plt.title('Minimum distance: {}, window widht: {}, slope weight: {}'.format(dist, w, s)) | |
plt.show() | |