synls / GAN /utils.py
danlin1128's picture
Upload 33 files
67069a4 verified
raw
history blame
10.8 kB
import math
import numpy as np
import typing as T
import seaborn as sns
import matplotlib.pyplot as plt
import sklearn
import sklearn.manifold
import tensorflow as tf
import numpy.typing as npt
from tensorflow import keras
from tensorflow.python.types.core import TensorLike
Tensor = T.Union[tf.Tensor, npt.NDArray]
OptTensor = T.Optional[Tensor]
EPS = 1e-18
class TSFeatureScaler:
"""Global time series scaler that scales all features to [0,1] then normalizes to [-1,1]"""
def __init__(self) -> None:
self.min_val = None
self.max_val = None
def fit(self, X: TensorLike) -> "TSFeatureScaler":
"""
Fit scaler to data
Args:
X: Input tensor of shape [N, T, D]
(N: samples, T: timesteps, D: features)
"""
# 计算整个数据集的全局最大最小值
self.min_val = np.min(X)
self.max_val = np.max(X)
return self
def transform(self, X: TensorLike) -> TensorLike:
"""
Transform data in two steps:
1. Scale to [0,1] using min-max scaling
2. Normalize to [-1,1]
"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted before transform")
# 1. 缩放到[0,1]
X_scaled = (X - self.min_val) / (self.max_val - self.min_val + EPS)
# 2. 归一化到[-1,1]
X_normalized = 2.0 * X_scaled - 1.0
return X_normalized
def inverse_transform(self, X: TensorLike) -> TensorLike:
"""
Inverse transform data:
1. From [-1,1] back to [0,1]
2. From [0,1] back to original range
"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted before inverse_transform")
# 1. 从[-1,1]转回[0,1]
X_scaled = (X + 1.0) / 2.0
# 2. 从[0,1]转回原始范围
X_original = X_scaled * (self.max_val - self.min_val + EPS) + self.min_val
return X_original
def fit_transform(self, X: TensorLike) -> TensorLike:
"""Fit to data, then transform it"""
return self.fit(X).transform(X)
def get_range(self) -> T.Tuple[float, float]:
"""获取原始数据的范围"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted first")
return (self.min_val, self.max_val)
EPS = 1e-18
class TSFeatureWiseScaler():
def __init__(self, feature_range: T.Tuple[float, float] = (0, 1)) -> None:
assert len(feature_range) == 2
self._min_v, self._max_v = feature_range
# X: N x T x D
def fit(self, X: TensorLike) -> "TSFeatureWiseScaler":
D = X.shape[2]
self.mins = np.zeros(D)
self.maxs = np.zeros(D)
for i in range(D):
self.mins[i] = np.min(X[:, :, i])
self.maxs[i] = np.max(X[:, :, i])
return self
def transform(self, X: TensorLike) -> TensorLike:
return ((X - self.mins) / (self.maxs - self.mins + EPS)) * (self._max_v - self._min_v) + self._min_v
def inverse_transform(self, X: TensorLike) -> TensorLike:
X -= self._min_v
X /= self._max_v - self._min_v
X *= (self.maxs - self.mins + EPS)
X += self.mins
return X
def fit_transform(self, X: TensorLike) -> TensorLike:
self.fit(X)
return self.transform(X)
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.99): # beta_end=0.99
betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
return betas
def cosine_beta_schedule(timesteps, s=0.008):
steps = timesteps + 1
x = np.linspace(0, timesteps, steps, dtype=np.float64)
alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
betas = np.clip(betas, 0, 0.999)
return betas
def reconstruction_loss_by_axis(original: tf.Tensor, reconstructed: tf.Tensor, axis: int = 0) -> tf.Tensor:
"""
Calculate the reconstruction loss based on a specified axis.
This function computes the reconstruction loss between the original data and
the reconstructed data along a specified axis. The loss can be computed in
two ways depending on the chosen axis:
- When `axis` is 0, it computes the loss as the sum of squared differences
between the original and reconstructed data for all elements.
- When `axis` is 1 or 2, it computes the mean squared error (MSE) between the
mean values along the chosen axis for the original and reconstructed data.
Parameters:
----------
original : tf.Tensor
The original data tensor.
reconstructed : tf.Tensor
The reconstructed data tensor, typically produced by an autoencoder.
axis : int, optional (default=0)
The axis along which to compute the reconstruction loss:
- 0: All elements (sum of squared differences).
- 1: Along features (MSE).
- 2: Along time steps (MSE).
Returns:
-------
tf.Tensor
The computed reconstruction loss as a TensorFlow tensor.
Notes:
------
- This function is commonly used in the context of autoencoders and other
reconstruction-based models to assess the quality of the reconstruction.
- The choice of `axis` determines how the loss is calculated, and it should
align with the data's structure.
"""
# axis=0 all (sum of squared diffs)
# axis=1 features (MSE)
# axis=2 times (MSE)
if axis == 0:
return tf.reduce_sum(tf.math.squared_difference(original, reconstructed))
else:
return tf.losses.mean_squared_error(tf.reduce_mean(original, axis=axis), tf.reduce_mean(reconstructed, axis=axis))
def gen_sine_dataset(N: int, T: int, D: int, max_value: int = 10) -> npt.NDArray:
result = []
for i in range(N):
result.append([])
a = np.random.random() * max_value
shift = np.random.random() * max_value + 1
ts = np.arange(0, T, 1)
for d in range(1, D + 1):
result[-1].append((a * np.sin((d + 3) * ts / 25. + shift)).T)
return np.transpose(np.array(result), [0, 2, 1])
def gen_sine_vs_const_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0) -> T.Tuple[TensorLike, TensorLike]:
result_X, result_y = [], []
for i in range(N):
scales = np.random.random(D) * max_value
consts = np.random.random(D) * const
shifts = np.random.random(D) * 2
alpha = np.random.random()
if np.random.random() < 0.5:
times = np.repeat(np.arange(0, T, 1)[:, None], D, axis=1) / 10
result_X.append(np.sin(alpha * times + shifts) * scales)
result_y.append(0)
else:
result_X.append(np.tile(consts, (T, 1)))
result_y.append(1)
return np.array(result_X), np.array(result_y)
def visualize_ts_lineplot(
ts: Tensor,
ys: OptTensor = None,
num: int = 5,
unite_features: bool = True,
) -> None:
assert len(ts.shape) == 3
fig, axs = plt.subplots(num, 1, figsize=(14, 10))
if num == 1:
axs = [axs]
ids = np.random.choice(ts.shape[0], size=num, replace=False)
for i, sample_id in enumerate(ids):
if not unite_features:
feature_id = np.random.randint(ts.shape[2])
sns.lineplot(
x=range(ts.shape[1]),
y=ts[sample_id, :, feature_id],
ax=axs[i],
label=rf"feature \#{feature_id}",
)
else:
for feat_id in range(ts.shape[2]):
sns.lineplot(
x=range(ts.shape[1]), y=ts[sample_id, :, feat_id], ax=axs[i]
)
if ys is not None:
if len(ys.shape) == 1:
axs[i].set_title(ys[sample_id])
elif len(ys.shape) == 2:
sns.lineplot(
x=range(ts.shape[1]),
y=ys[sample_id],
ax=axs[i].twinx(),
color="g",
label="Target variable",
)
else:
raise ValueError("ys contains too many dimensions")
#plt.show()
def visualize_tsne(
X: Tensor,
y: Tensor,
X_gen: Tensor,
y_gen: Tensor,
path: str = "/tmp/tsne_embeddings.pdf",
feature_averaging: bool = False,
perplexity=30.0
) -> None:
"""
Visualizes t-SNE embeddings of real and synthetic data.
This function generates a scatter plot of t-SNE embeddings for real and synthetic data.
Each data point is represented by a marker on the plot, and the colors of the markers
correspond to the corresponding class labels of the data points.
:param X: The original real data tensor of shape (num_samples, num_features).
:type X: tsgm.types.Tensor
:param y: The labels of the original real data tensor of shape (num_samples,).
:type y: tsgm.types.Tensor
:param X_gen: The generated synthetic data tensor of shape (num_samples, num_features).
:type X_gen: tsgm.types.Tensor
:param y_gen: The labels of the generated synthetic data tensor of shape (num_samples,).
:type y_gen: tsgm.types.Tensor
:param path: The path to save the visualization as a PDF file. Defaults to "/tmp/tsne_embeddings.pdf".
:type path: str, optional
:param feature_averaging: Whether to compute the average features for each class. Defaults to False.
:type feature_averaging: bool, optional
"""
tsne = sklearn.manifold.TSNE(n_components=2, perplexity=perplexity, learning_rate="auto", init="random")
if feature_averaging:
X_all = np.concatenate((np.mean(X, axis=2), np.mean(X_gen, axis=2)))
X_emb = tsne.fit_transform(np.resize(X_all, (X_all.shape[0], X_all.shape[1])))
else:
X_all = np.concatenate((X, X_gen))
X_emb = tsne.fit_transform(
np.resize(X_all, (X_all.shape[0], X_all.shape[1] * X_all.shape[2]))
)
y_all = np.concatenate((y, y_gen))
c = np.argmax(y_all, axis=1)
colors = {0: "class 0", 1: "class 1"}
c = [colors[el] for el in c]
point_styles = ["hist"] * X.shape[0] + ["gen"] * X_gen.shape[0]
plt.figure(figsize=(8, 6), dpi=80)
sns.scatterplot(
x=X_emb[:, 0],
y=X_emb[:, 1],
hue=c,
style=point_styles,
markers={"hist": "<", "gen": "H"},
alpha=0.7,
)
plt.legend()
plt.box(False)
plt.axis("off")
plt.savefig(path)
plt.show()