Spaces:
Running
Running
import math | |
import numpy as np | |
import typing as T | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import sklearn | |
import sklearn.manifold | |
import tensorflow as tf | |
import numpy.typing as npt | |
from tensorflow import keras | |
from tensorflow.python.types.core import TensorLike | |
Tensor = T.Union[tf.Tensor, npt.NDArray] | |
OptTensor = T.Optional[Tensor] | |
EPS = 1e-18 | |
class TSFeatureScaler: | |
"""Global time series scaler that scales all features to [0,1] then normalizes to [-1,1]""" | |
def __init__(self) -> None: | |
self.min_val = None | |
self.max_val = None | |
def fit(self, X: TensorLike) -> "TSFeatureScaler": | |
""" | |
Fit scaler to data | |
Args: | |
X: Input tensor of shape [N, T, D] | |
(N: samples, T: timesteps, D: features) | |
""" | |
# 计算整个数据集的全局最大最小值 | |
self.min_val = np.min(X) | |
self.max_val = np.max(X) | |
return self | |
def transform(self, X: TensorLike) -> TensorLike: | |
""" | |
Transform data in two steps: | |
1. Scale to [0,1] using min-max scaling | |
2. Normalize to [-1,1] | |
""" | |
if self.min_val is None or self.max_val is None: | |
raise ValueError("Scaler must be fitted before transform") | |
# 1. 缩放到[0,1] | |
X_scaled = (X - self.min_val) / (self.max_val - self.min_val + EPS) | |
# 2. 归一化到[-1,1] | |
X_normalized = 2.0 * X_scaled - 1.0 | |
return X_normalized | |
def inverse_transform(self, X: TensorLike) -> TensorLike: | |
""" | |
Inverse transform data: | |
1. From [-1,1] back to [0,1] | |
2. From [0,1] back to original range | |
""" | |
if self.min_val is None or self.max_val is None: | |
raise ValueError("Scaler must be fitted before inverse_transform") | |
# 1. 从[-1,1]转回[0,1] | |
X_scaled = (X + 1.0) / 2.0 | |
# 2. 从[0,1]转回原始范围 | |
X_original = X_scaled * (self.max_val - self.min_val + EPS) + self.min_val | |
return X_original | |
def fit_transform(self, X: TensorLike) -> TensorLike: | |
"""Fit to data, then transform it""" | |
return self.fit(X).transform(X) | |
def get_range(self) -> T.Tuple[float, float]: | |
"""获取原始数据的范围""" | |
if self.min_val is None or self.max_val is None: | |
raise ValueError("Scaler must be fitted first") | |
return (self.min_val, self.max_val) | |
EPS = 1e-18 | |
class TSFeatureWiseScaler(): | |
def __init__(self, feature_range: T.Tuple[float, float] = (0, 1)) -> None: | |
assert len(feature_range) == 2 | |
self._min_v, self._max_v = feature_range | |
# X: N x T x D | |
def fit(self, X: TensorLike) -> "TSFeatureWiseScaler": | |
D = X.shape[2] | |
self.mins = np.zeros(D) | |
self.maxs = np.zeros(D) | |
for i in range(D): | |
self.mins[i] = np.min(X[:, :, i]) | |
self.maxs[i] = np.max(X[:, :, i]) | |
return self | |
def transform(self, X: TensorLike) -> TensorLike: | |
return ((X - self.mins) / (self.maxs - self.mins + EPS)) * (self._max_v - self._min_v) + self._min_v | |
def inverse_transform(self, X: TensorLike) -> TensorLike: | |
X -= self._min_v | |
X /= self._max_v - self._min_v | |
X *= (self.maxs - self.mins + EPS) | |
X += self.mins | |
return X | |
def fit_transform(self, X: TensorLike) -> TensorLike: | |
self.fit(X) | |
return self.transform(X) | |
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.99): # beta_end=0.99 | |
betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32) | |
return betas | |
def cosine_beta_schedule(timesteps, s=0.008): | |
steps = timesteps + 1 | |
x = np.linspace(0, timesteps, steps, dtype=np.float64) | |
alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2 | |
alphas_cumprod = alphas_cumprod / alphas_cumprod[0] | |
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) | |
betas = np.clip(betas, 0, 0.999) | |
return betas | |
def reconstruction_loss_by_axis(original: tf.Tensor, reconstructed: tf.Tensor, axis: int = 0) -> tf.Tensor: | |
""" | |
Calculate the reconstruction loss based on a specified axis. | |
This function computes the reconstruction loss between the original data and | |
the reconstructed data along a specified axis. The loss can be computed in | |
two ways depending on the chosen axis: | |
- When `axis` is 0, it computes the loss as the sum of squared differences | |
between the original and reconstructed data for all elements. | |
- When `axis` is 1 or 2, it computes the mean squared error (MSE) between the | |
mean values along the chosen axis for the original and reconstructed data. | |
Parameters: | |
---------- | |
original : tf.Tensor | |
The original data tensor. | |
reconstructed : tf.Tensor | |
The reconstructed data tensor, typically produced by an autoencoder. | |
axis : int, optional (default=0) | |
The axis along which to compute the reconstruction loss: | |
- 0: All elements (sum of squared differences). | |
- 1: Along features (MSE). | |
- 2: Along time steps (MSE). | |
Returns: | |
------- | |
tf.Tensor | |
The computed reconstruction loss as a TensorFlow tensor. | |
Notes: | |
------ | |
- This function is commonly used in the context of autoencoders and other | |
reconstruction-based models to assess the quality of the reconstruction. | |
- The choice of `axis` determines how the loss is calculated, and it should | |
align with the data's structure. | |
""" | |
# axis=0 all (sum of squared diffs) | |
# axis=1 features (MSE) | |
# axis=2 times (MSE) | |
if axis == 0: | |
return tf.reduce_sum(tf.math.squared_difference(original, reconstructed)) | |
else: | |
return tf.losses.mean_squared_error(tf.reduce_mean(original, axis=axis), tf.reduce_mean(reconstructed, axis=axis)) | |
def gen_sine_dataset(N: int, T: int, D: int, max_value: int = 10) -> npt.NDArray: | |
result = [] | |
for i in range(N): | |
result.append([]) | |
a = np.random.random() * max_value | |
shift = np.random.random() * max_value + 1 | |
ts = np.arange(0, T, 1) | |
for d in range(1, D + 1): | |
result[-1].append((a * np.sin((d + 3) * ts / 25. + shift)).T) | |
return np.transpose(np.array(result), [0, 2, 1]) | |
def gen_sine_vs_const_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0) -> T.Tuple[TensorLike, TensorLike]: | |
result_X, result_y = [], [] | |
for i in range(N): | |
scales = np.random.random(D) * max_value | |
consts = np.random.random(D) * const | |
shifts = np.random.random(D) * 2 | |
alpha = np.random.random() | |
if np.random.random() < 0.5: | |
times = np.repeat(np.arange(0, T, 1)[:, None], D, axis=1) / 10 | |
result_X.append(np.sin(alpha * times + shifts) * scales) | |
result_y.append(0) | |
else: | |
result_X.append(np.tile(consts, (T, 1))) | |
result_y.append(1) | |
return np.array(result_X), np.array(result_y) | |
def visualize_ts_lineplot( | |
ts: Tensor, | |
ys: OptTensor = None, | |
num: int = 5, | |
unite_features: bool = True, | |
) -> None: | |
assert len(ts.shape) == 3 | |
fig, axs = plt.subplots(num, 1, figsize=(14, 10)) | |
if num == 1: | |
axs = [axs] | |
ids = np.random.choice(ts.shape[0], size=num, replace=False) | |
for i, sample_id in enumerate(ids): | |
if not unite_features: | |
feature_id = np.random.randint(ts.shape[2]) | |
sns.lineplot( | |
x=range(ts.shape[1]), | |
y=ts[sample_id, :, feature_id], | |
ax=axs[i], | |
label=rf"feature \#{feature_id}", | |
) | |
else: | |
for feat_id in range(ts.shape[2]): | |
sns.lineplot( | |
x=range(ts.shape[1]), y=ts[sample_id, :, feat_id], ax=axs[i] | |
) | |
if ys is not None: | |
if len(ys.shape) == 1: | |
axs[i].set_title(ys[sample_id]) | |
elif len(ys.shape) == 2: | |
sns.lineplot( | |
x=range(ts.shape[1]), | |
y=ys[sample_id], | |
ax=axs[i].twinx(), | |
color="g", | |
label="Target variable", | |
) | |
else: | |
raise ValueError("ys contains too many dimensions") | |
#plt.show() | |
def visualize_tsne( | |
X: Tensor, | |
y: Tensor, | |
X_gen: Tensor, | |
y_gen: Tensor, | |
path: str = "/tmp/tsne_embeddings.pdf", | |
feature_averaging: bool = False, | |
perplexity=30.0 | |
) -> None: | |
""" | |
Visualizes t-SNE embeddings of real and synthetic data. | |
This function generates a scatter plot of t-SNE embeddings for real and synthetic data. | |
Each data point is represented by a marker on the plot, and the colors of the markers | |
correspond to the corresponding class labels of the data points. | |
:param X: The original real data tensor of shape (num_samples, num_features). | |
:type X: tsgm.types.Tensor | |
:param y: The labels of the original real data tensor of shape (num_samples,). | |
:type y: tsgm.types.Tensor | |
:param X_gen: The generated synthetic data tensor of shape (num_samples, num_features). | |
:type X_gen: tsgm.types.Tensor | |
:param y_gen: The labels of the generated synthetic data tensor of shape (num_samples,). | |
:type y_gen: tsgm.types.Tensor | |
:param path: The path to save the visualization as a PDF file. Defaults to "/tmp/tsne_embeddings.pdf". | |
:type path: str, optional | |
:param feature_averaging: Whether to compute the average features for each class. Defaults to False. | |
:type feature_averaging: bool, optional | |
""" | |
tsne = sklearn.manifold.TSNE(n_components=2, perplexity=perplexity, learning_rate="auto", init="random") | |
if feature_averaging: | |
X_all = np.concatenate((np.mean(X, axis=2), np.mean(X_gen, axis=2))) | |
X_emb = tsne.fit_transform(np.resize(X_all, (X_all.shape[0], X_all.shape[1]))) | |
else: | |
X_all = np.concatenate((X, X_gen)) | |
X_emb = tsne.fit_transform( | |
np.resize(X_all, (X_all.shape[0], X_all.shape[1] * X_all.shape[2])) | |
) | |
y_all = np.concatenate((y, y_gen)) | |
c = np.argmax(y_all, axis=1) | |
colors = {0: "class 0", 1: "class 1"} | |
c = [colors[el] for el in c] | |
point_styles = ["hist"] * X.shape[0] + ["gen"] * X_gen.shape[0] | |
plt.figure(figsize=(8, 6), dpi=80) | |
sns.scatterplot( | |
x=X_emb[:, 0], | |
y=X_emb[:, 1], | |
hue=c, | |
style=point_styles, | |
markers={"hist": "<", "gen": "H"}, | |
alpha=0.7, | |
) | |
plt.legend() | |
plt.box(False) | |
plt.axis("off") | |
plt.savefig(path) | |
plt.show() | |