Spaces:
Running
Running
import math | |
import shutil | |
import numpy as np | |
from tqdm.auto import tqdm | |
import tensorflow as tf | |
from tensorflow import keras | |
from tensorflow.keras import layers | |
from tensorflow.keras.layers import Layer | |
from GAN.utils import linear_beta_schedule, cosine_beta_schedule | |
import matplotlib.pyplot as plt | |
import os | |
from GAN.utils import TSFeatureScaler | |
class GaussianDiffusion: | |
def __init__( | |
self, | |
beta_schedule='cosine', | |
timesteps=10, | |
clip_min=-1.0, | |
clip_max=1.0, | |
): | |
self.timesteps = timesteps | |
self.clip_min = clip_min | |
self.clip_max = clip_max | |
if beta_schedule == 'linear': | |
betas = linear_beta_schedule(timesteps) | |
elif beta_schedule == 'cosine': | |
betas = cosine_beta_schedule(timesteps) | |
else: | |
raise ValueError(f'unknown beta schedule {beta_schedule}') | |
alphas = 1. - betas | |
alphas_cumprod = np.cumprod(alphas, axis=0) | |
alphas_cumprod_prev = np.append(1., alphas_cumprod[:-1]) | |
self.betas = tf.constant(betas, dtype=tf.float32) | |
self.alphas_cumprod = tf.constant(alphas_cumprod, dtype=tf.float32) | |
self.alphas_cumprod_prev = tf.constant(alphas_cumprod_prev, dtype=tf.float32) | |
self.sqrt_recip_alphas = tf.constant(np.sqrt(1. / alphas), dtype=tf.float32) | |
self.sqrt_alphas_cumprod = tf.constant(np.sqrt(self.alphas_cumprod), dtype=tf.float32) | |
self.sqrt_one_minus_alphas_cumprod = tf.constant(np.sqrt(1.0 - self.alphas_cumprod), dtype=tf.float32) | |
self.log_one_minus_alphas_cumprod = tf.constant(np.log(1. - alphas_cumprod), dtype=tf.float32) | |
self.sqrt_recip_alphas_cumprod = tf.constant(np.sqrt(1. / alphas_cumprod), dtype=tf.float32) | |
self.sqrt_recipm1_alphas_cumprod = tf.constant(np.sqrt(1.0 / alphas_cumprod - 1), dtype=tf.float32) | |
self.posterior_variance = (betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)) | |
self.posterior_log_variance_clipped = tf.constant( | |
np.log(np.maximum(self.posterior_variance, 1e-20)), dtype=tf.float32 | |
) | |
self.posterior_mean_coef1 = tf.constant( | |
betas * np.sqrt(alphas_cumprod_prev) / (1.0 - alphas_cumprod), | |
dtype=tf.float32, | |
) | |
self.posterior_mean_coef2 = tf.constant( | |
(1.0 - alphas_cumprod_prev) * np.sqrt(alphas) / (1.0 - alphas_cumprod), | |
dtype=tf.float32, | |
) | |
def _extract(self, a, t, x_shape): | |
batch_size = x_shape[0] | |
out = tf.gather(a, t) | |
return tf.reshape(out, [batch_size, 1, 1]) | |
def q_sample(self, x_start, t): | |
x_start_shape = tf.shape(x_start) | |
samp = self._extract(self.sqrt_alphas_cumprod, t, x_start_shape) * x_start | |
noise = tf.random.normal(shape=tf.shape(x_start), dtype='float32') | |
weight_noise = self._extract(self.sqrt_one_minus_alphas_cumprod, t, x_start_shape) * noise * 0.5 | |
# diffused_sample = self._extract(self.sqrt_alphas_cumprod, t, x_start_shape) * x_start + self._extract( | |
# self.sqrt_one_minus_alphas_cumprod, t, x_start_shape) * noise #* 0.1 | |
diffused_sample = x_start + weight_noise | |
diffused_sample = tf.clip_by_value(diffused_sample , -0.99, 0.99) | |
weight_noise = diffused_sample - x_start | |
return samp, weight_noise, diffused_sample | |
def predict_start_from_noise(self, x_t, t, noise): | |
x_t_shape = tf.shape(x_t) | |
return ( | |
self._extract(self.sqrt_recip_alphas_cumprod, t, x_t_shape) * x_t | |
- self._extract(self.sqrt_recipm1_alphas_cumprod, t, x_t_shape) * noise | |
) | |
def q_posterior(self, x_start, x_t, t): | |
x_t_shape = tf.shape(x_t) | |
posterior_mean = ( | |
self._extract(self.posterior_mean_coef1, t, x_t_shape) * x_start | |
+ self._extract(self.posterior_mean_coef2, t, x_t_shape) * x_t | |
) | |
posterior_variance = self._extract(self.posterior_variance, t, x_t_shape) | |
posterior_log_variance_clipped = self._extract( | |
self.posterior_log_variance_clipped, t, x_t_shape | |
) | |
return posterior_mean, posterior_variance, posterior_log_variance_clipped | |
def p_mean_variance(self, pred_noise, x, t, clip_denoised=False): | |
x_recon = self.predict_start_from_noise(x, t=t, noise=pred_noise) | |
if clip_denoised: | |
x_recon = tf.clip_by_value(x_recon, self.clip_min, self.clip_max) | |
model_mean, posterior_variance, posterior_log_variance = self.q_posterior( | |
x_start=x_recon, x_t=x, t=t | |
) | |
return model_mean, posterior_variance, posterior_log_variance | |
def p_sample(self, pred_noise, x, t, clip_denoised=False): | |
model_mean, _, model_log_variance = self.p_mean_variance( | |
pred_noise, x=x, t=t, clip_denoised=clip_denoised | |
) | |
variance_term = tf.exp(0.5 * model_log_variance) | |
noise = tf.random.normal(shape=tf.shape(x), dtype=x.dtype) | |
nonzero_mask = tf.reshape( | |
1 - tf.cast(tf.equal(t, 0), tf.float32), [tf.shape(x)[0], 1, 1] | |
) | |
noise_term = variance_term * nonzero_mask * noise | |
sample = model_mean + noise_term | |
return sample | |
class TimeEmbedding(layers.Layer): | |
def __init__(self, dim, **kwargs): | |
super().__init__(**kwargs) | |
self.dim = dim | |
self.half_dim = dim // 2 | |
self.emb = math.log(10000) / (self.half_dim - 1) | |
self.emb = tf.exp(tf.range(self.half_dim, dtype=tf.float32) * -self.emb) | |
def call(self, inputs): | |
inputs = tf.cast(inputs, dtype=tf.float32) | |
emb = inputs[:, None] * self.emb[None, :] | |
emb = tf.concat([tf.sin(emb), tf.cos(emb)], axis=-1) | |
return emb | |
def TimeMLP(units, activation_fn=keras.activations.swish): | |
def apply(inputs): | |
temb = layers.Dense( | |
units, activation=activation_fn, kernel_initializer=kernel_init(1.0) | |
)(inputs) | |
# temb = layers.Dense(units, kernel_initializer=kernel_init(1.0))(temb) | |
return temb | |
return apply | |
# 创建一个新的层来包装时间嵌入操作 | |
class TimeEmbeddingLayer(Layer): | |
def __init__(self, d_model, **kwargs): | |
super().__init__(**kwargs) | |
self.d_model = d_model | |
def call(self, timesteps): | |
# 扩展维度 | |
timesteps = tf.expand_dims(timesteps, -1) | |
# 计算不同频率的正弦信号 | |
dim = self.d_model // 2 | |
inv_freq = 1.0 / (10000 ** (tf.range(0, dim, dtype=tf.float32) / dim)) | |
# 计算正弦波 | |
sinusoid_inp = tf.einsum("i,j->ij", tf.squeeze(timesteps), inv_freq) | |
pos_emb = tf.concat([tf.sin(sinusoid_inp), tf.cos(sinusoid_inp)], axis=-1) | |
return pos_emb | |
# Kernel initializer to use | |
def kernel_init(scale): | |
scale = max(scale, 1e-10) | |
return keras.initializers.VarianceScaling( | |
scale, mode="fan_avg", distribution="uniform" | |
) | |
def build_encoder_time(embed_dim=16, num_heads=2, ff_dim=32): | |
def apply(inputs): | |
x, t = inputs | |
position_embedding_layer = layers.Embedding(x.shape[1], embed_dim) | |
pos_encoding = position_embedding_layer(tf.range(x.shape[1])) | |
embeddings = x + pos_encoding + t | |
# Encoder blocks | |
for _ in range(2): # Repeat twice | |
# Multi-head self-attention mechanism | |
attention_output, attention_score = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)( | |
embeddings, embeddings, return_attention_scores=True) | |
# Add residual connection and layer normalization | |
x = layers.Add()([embeddings, attention_output]) | |
x = layers.LayerNormalization(epsilon=1e-6)(x) | |
# Feed-forward network | |
ff_output = layers.Dense(ff_dim, activation="relu")(x) | |
ff_output = layers.Dense(embed_dim)(ff_output) | |
# Add residual connection and layer normalization | |
x = layers.Add()([x, ff_output]) | |
x = layers.LayerNormalization(epsilon=1e-6)(x) | |
return x, attention_score | |
return apply | |
def build_encoder_variales(embed_dim=16, num_heads=2, ff_dim=32): | |
def apply(inputs): | |
x, t = inputs | |
x = layers.Conv1D(16, kernel_size=3, padding='same')(x) | |
embeddings = x + t | |
# Encoder blocks | |
for _ in range(2): # Repeat twice | |
# Multi-head self-attention mechanism | |
attention_output, attention_score = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)( | |
embeddings, embeddings, return_attention_scores=True) | |
# Add residual connection and layer normalization | |
x = layers.Add()([embeddings, attention_output]) | |
x = layers.LayerNormalization(epsilon=1e-6)(x) | |
# Feed-forward network | |
ff_output = layers.Dense(ff_dim, activation="relu")(x) | |
ff_output = layers.Dense(embed_dim)(ff_output) | |
# Add residual connection and layer normalization | |
x = layers.Add()([x, ff_output]) | |
x = layers.LayerNormalization(epsilon=1e-6)(x) | |
return x, attention_score | |
return apply | |
def build_decoder(embed_dim=16, num_heads=2, ff_dim=32): | |
def apply(inputs): | |
encoder_outputs, t = inputs | |
position_embedding_layer = layers.Embedding(encoder_outputs.shape[1], embed_dim) | |
pos_encoding = position_embedding_layer(tf.range(encoder_outputs.shape[1])) | |
dec_embeddings = encoder_outputs + pos_encoding + t | |
# Decoder blocks | |
dec_output = dec_embeddings | |
for _ in range(2): # Repeat twice | |
# Multi-head attention over encoder outputs | |
attention2_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)( | |
dec_output, encoder_outputs) | |
# Add residual connection and layer normalization | |
dec_output = layers.Add()([dec_output, attention2_output]) | |
dec_output = layers.LayerNormalization(epsilon=1e-6)(dec_output) | |
# Feed-forward network | |
ff_output = layers.Dense(ff_dim, activation="relu")(dec_output) | |
ff_output = layers.Dense(embed_dim)(ff_output) | |
# Add residual connection and layer normalization | |
dec_output = layers.Add()([dec_output, ff_output]) | |
dec_output = layers.LayerNormalization(epsilon=1e-6)(dec_output) | |
return dec_output | |
return apply | |
def build_model(time_len, fea_num, d_model=16, n_heads=2, encoder_type='dual'): | |
""" | |
Build the transformer-based diffusion model. | |
""" | |
print(f"\nBuilding model with encoder type: {encoder_type}") | |
print(f"Input shape: time_len={time_len}, features={fea_num}, d_model={d_model}") | |
# Input layers | |
x_input = layers.Input(shape=(time_len, fea_num)) | |
#time_input = layers.Input(shape=()) | |
#time_emb = get_time_embedding(time_input, d_model) | |
time_input = Input(shape=(1,), name='time_input') | |
feature_input = Input(shape=(time_len, fea_num), name='feature_input') | |
time_emb = TimeEmbeddingLayer(d_model)(time_input) | |
encoded_features = [] | |
if encoder_type in ['time', 'dual']: | |
print("→ Using Time Transformer Encoder") | |
# Time Transformer | |
time_encoded = time_transformer_encoder( | |
x_input, | |
time_emb, | |
d_model=d_model, | |
n_heads=n_heads | |
) | |
print(f" Time encoder output shape: {time_encoded.shape}") | |
encoded_features.append(time_encoded) | |
if encoder_type in ['pairwise', 'dual']: | |
print("→ Using Pairwise Correlation Encoder") | |
# Pairwise Correlation Transformer | |
pairwise_encoded = pairwise_transformer_encoder( | |
x_input, | |
time_emb, | |
d_model=d_model, | |
n_heads=n_heads | |
) | |
print(f" Pairwise encoder output shape: {pairwise_encoded.shape}") | |
encoded_features.append(pairwise_encoded) | |
# Combine encodings based on encoder type | |
if encoder_type == 'dual': | |
print("→ Combining both encoders") | |
encoded = layers.Concatenate(axis=-1)(encoded_features) | |
print(f" Combined shape before projection: {encoded.shape}") | |
encoded = layers.Dense(d_model)(encoded) | |
print(f" Final encoded shape after projection: {encoded.shape}") | |
else: | |
encoded = encoded_features[0] | |
print(f"→ Using single encoder output shape: {encoded.shape}") | |
# Add residual connection | |
if encoder_type != 'dual': | |
print("→ Adding residual connection") | |
encoded = layers.Add()([encoded, layers.Dense(d_model)(x_input)]) | |
# Decoder | |
decoded = decoder_module(encoded, time_emb) | |
print(f"→ Decoder output shape: {decoded.shape}") | |
# Final output layer | |
output = layers.Dense(fea_num)(decoded) | |
print(f"→ Final output shape: {output.shape}") | |
print("Model building completed!\n") | |
return keras.Model(inputs=[x_input, time_input], outputs=output) | |
def pairwise_transformer_encoder(x, time_emb, d_model=16, n_heads=2): | |
"""Pairwise Correlation Transformer encoder implementation""" | |
# Get input shape | |
input_shape = x.shape | |
time_len = input_shape[1] | |
# Transpose input to treat features as sequence | |
x_transposed = tf.transpose(x, perm=[0, 2, 1]) # [batch, features, time] | |
# Project input | |
x_proj = layers.Dense(d_model)(x_transposed) | |
# Expand time embeddings | |
time_emb = tf.expand_dims(time_emb, 1) | |
time_emb = tf.tile(time_emb, [1, tf.shape(x_proj)[1], 1]) | |
# Add time embeddings | |
x = x_proj + time_emb | |
# Transformer encoder layers | |
for _ in range(2): | |
x = transformer_encoder_layer(x, d_model, n_heads) | |
# Project to correct time dimension and transpose back | |
x = layers.Dense(time_len)(x) # Project to original time dimension | |
x = tf.transpose(x, perm=[0, 2, 1]) # [batch, time, features] | |
# Final projection to match d_model dimension | |
x = layers.Dense(d_model)(x) # [batch, time_len, d_model] | |
return x | |
def time_transformer_encoder(x, time_emb, d_model=16, n_heads=2): | |
"""Time Transformer encoder implementation""" | |
# Position embeddings | |
pos_emb = get_positional_embedding(tf.shape(x)[1], d_model) | |
# Project input | |
x = layers.Dense(d_model)(x) | |
# Expand time embeddings | |
time_emb = tf.expand_dims(time_emb, 1) | |
time_emb = tf.tile(time_emb, [1, tf.shape(x)[1], 1]) | |
# Add embeddings | |
x = x + pos_emb + time_emb | |
# Transformer encoder layers | |
for _ in range(2): | |
x = transformer_encoder_layer(x, d_model, n_heads) | |
return x # [batch, time_len, d_model] | |
def transformer_encoder_layer(x, d_model, n_heads): | |
"""Single transformer encoder layer with multi-head attention""" | |
# Multi-head self attention | |
attention_output = layers.MultiHeadAttention( | |
num_heads=n_heads, | |
key_dim=d_model // n_heads | |
)(x, x) | |
# Add & Norm | |
x = layers.Add()([x, attention_output]) | |
x = layers.LayerNormalization()(x) | |
# Feed Forward Network | |
ffn_output = layers.Dense(d_model * 4, activation='relu')(x) | |
ffn_output = layers.Dense(d_model)(ffn_output) | |
# Add & Norm | |
x = layers.Add()([x, ffn_output]) | |
x = layers.LayerNormalization()(x) | |
return x | |
def decoder_module(encoded, time_emb): | |
"""Decoder implementation""" | |
# Expand time embeddings | |
time_emb = tf.expand_dims(time_emb, 1) | |
time_emb = tf.tile(time_emb, [1, tf.shape(encoded)[1], 1]) | |
# Concatenate along feature dimension | |
x = layers.Concatenate(axis=-1)([encoded, time_emb]) | |
# Decoder layers | |
x = layers.Dense(256, activation='relu')(x) | |
x = layers.Dense(128, activation='relu')(x) | |
return x | |
def get_time_embedding(timesteps, embedding_dim): | |
""" | |
Create sinusoidal time embeddings. | |
Args: | |
timesteps: Tensor of shape [batch_size] containing timesteps | |
embedding_dim: Dimension of the embeddings to create | |
""" | |
# Ensure timesteps is a 2D tensor | |
timesteps = tf.expand_dims(timesteps, -1) | |
# Calculate positions and dimensions | |
half_dim = embedding_dim // 2 | |
emb = math.log(10000) / (half_dim - 1) | |
emb = tf.exp(tf.range(half_dim, dtype=tf.float32) * -emb) | |
# Create embeddings | |
emb = tf.cast(timesteps, dtype=tf.float32) * emb[None, :] | |
emb = tf.concat([tf.sin(emb), tf.cos(emb)], axis=-1) | |
# Handle odd embedding dimensions | |
if embedding_dim % 2 == 1: | |
emb = tf.pad(emb, [[0, 0], [0, 1]]) | |
return emb # Shape: [batch_size, embedding_dim] | |
def get_positional_embedding(sequence_length, embedding_dim): | |
""" | |
Create sinusoidal position embeddings. | |
Args: | |
sequence_length: Length of the sequence | |
embedding_dim: Dimension of the embeddings to create | |
""" | |
# Create position indices | |
positions = tf.range(sequence_length, dtype=tf.float32)[:, tf.newaxis] | |
# Create dimension indices | |
dimensions = tf.range(0, embedding_dim, 2, dtype=tf.float32)[tf.newaxis, :] | |
# Calculate angle rates | |
angle_rates = 1 / tf.pow(10000.0, (2 * dimensions) / tf.cast(embedding_dim, tf.float32)) | |
# Calculate angle rads | |
angle_rads = positions * angle_rates | |
# Apply sin and cos | |
pos_encoding = tf.concat( | |
[tf.sin(angle_rads), tf.cos(angle_rads)], | |
axis=-1 | |
) | |
# Handle odd embedding dimensions | |
if embedding_dim % 2 == 1: | |
pos_encoding = tf.pad(pos_encoding, [[0, 0], [0, 1]]) | |
# Add batch dimension | |
pos_encoding = tf.expand_dims(pos_encoding, 0) | |
return pos_encoding | |
class DiffusionModel(keras.Model): | |
def __init__(self, network, ema_network, timesteps, gdf_util, data, ema=0.999): | |
super().__init__() | |
self.network = network | |
self.ema_network = ema_network | |
self.timesteps = timesteps | |
self.gdf_util = gdf_util | |
self.data = data | |
self.ema = ema | |
def train_step(self, data): | |
batch_size = tf.shape(data)[0] | |
t = tf.random.uniform( | |
minval=0, | |
maxval=self.timesteps, | |
shape=(batch_size,), | |
dtype=tf.int32 | |
) | |
old_weights = [tf.identity(w) for w in self.network.trainable_weights] | |
with tf.GradientTape() as tape: | |
_, noise, x_t = self.gdf_util.q_sample(data, t) | |
pred_noise = self.network([x_t, t], training=True) | |
loss = self.loss(noise, pred_noise) | |
gradients = tape.gradient(loss, self.network.trainable_weights) | |
self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights)) | |
for weight, ema_weight in zip(self.network.weights, self.ema_network.weights): | |
ema_weight.assign(self.ema * ema_weight + (1 - self.ema) * weight) | |
new_weights = self.network.trainable_weights | |
weight_changes = [] | |
for old_w, new_w in zip(old_weights, new_weights): | |
diff = tf.reduce_max(tf.abs(old_w - new_w)) | |
weight_changes.append(diff) | |
max_change = tf.reduce_max(weight_changes) | |
return { | |
"loss": loss, | |
"weight_max_change": max_change, | |
"has_weight_changed": max_change > 0 | |
} | |
def check_noise_levels(self): | |
x_0 = tf.cast(self.data, tf.float32) | |
print("\n=== Noise Level Analysis ===") | |
print(f"Using all {len(self.data)} samples") | |
print("Checking noise levels at different timesteps:") | |
timesteps_to_check = [ | |
0, | |
self.timesteps//4, | |
self.timesteps//2, | |
3*self.timesteps//4, | |
self.timesteps-1 | |
] | |
for t in timesteps_to_check: | |
sqrt_alphas = tf.sqrt(self.gdf_util.alphas_cumprod[t]) | |
sqrt_one_minus_alphas = tf.sqrt(1 - self.gdf_util.alphas_cumprod[t]) | |
_, noise, x_t = self.gdf_util.q_sample(x_0, tf.fill([len(x_0)], t)) | |
pred_noise = self.ema_network.predict([x_t, tf.fill([len(x_0)], t)], verbose=0) | |
print(f"\nTimestep {t}:") | |
print(f"Signal scaling factor (sqrt_alphas): {sqrt_alphas:.4f}") | |
print(f"Noise scaling factor (sqrt_1-alphas): {sqrt_one_minus_alphas:.4f}") | |
print(f"Original data range: [{tf.reduce_min(x_0):.4f}, {tf.reduce_max(x_0):.4f}]") | |
print(f"Noisy data range: [{tf.reduce_min(x_t):.4f}, {tf.reduce_max(x_t):.4f}]") | |
print(f"Added noise - Mean: {tf.reduce_mean(noise):.4f}, Std: {tf.math.reduce_std(noise):.4f}") | |
print(f"Scaled noise - Mean: {tf.reduce_mean(sqrt_one_minus_alphas * noise):.4f}, Std: {tf.math.reduce_std(sqrt_one_minus_alphas * noise):.4f}") | |
print(f"Predicted noise - Mean: {tf.reduce_mean(pred_noise):.4f}, Std: {tf.math.reduce_std(pred_noise):.4f}") | |
print(f"Noise prediction error: {tf.reduce_mean(tf.abs(noise - pred_noise)):.4f}") | |
def generate_ts(self, num_ts=16): | |
if num_ts > len(self.data): | |
indices = tf.random.uniform( | |
shape=[num_ts], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
initial_samples = tf.cast( | |
tf.gather(self.data, indices), | |
tf.float32 | |
) | |
else: | |
initial_samples = self.data | |
_, _, samples = self.gdf_util.q_sample(initial_samples, tf.fill([num_ts], self.timesteps-1)) | |
samples0 = samples | |
for i in reversed(range(0, self.timesteps)): | |
tt = tf.fill([num_ts], i) | |
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=num_ts | |
) | |
# print(f"\nStep {i}:") | |
# print(f"Predicted noise - Mean: {tf.reduce_mean(pred_noise):.4f}") | |
# print(f"Predicted noise - Std: {tf.math.reduce_std(pred_noise):.4f}") | |
# print(f"Predicted noise - Min: {tf.reduce_min(pred_noise):.4f}") | |
# print(f"Predicted noise - Max: {tf.reduce_max(pred_noise):.4f}") | |
samples = self.gdf_util.p_sample( | |
pred_noise, samples0, tt, clip_denoised=False | |
) | |
# scaler = TSFeatureScaler() | |
# print(f"Generated samples - Mean: {tf.reduce_mean(scaler.fit_transform(samples.numpy())):.4f}") | |
# print(f"Generated samples- Std: {tf.math.reduce_std(scaler.fit_transform(samples.numpy())):.4f}") | |
# print(f"Generated samples - Min: {tf.reduce_min(scaler.fit_transform(samples.numpy())):.4f}") | |
# print(f"Generated samples - Max: {tf.reduce_max(scaler.fit_transform(samples.numpy())):.4f}") | |
return samples | |
def plot_denoise_process(self, save_dir='denoise_process'): | |
if os.path.exists(save_dir): | |
shutil.rmtree(save_dir) | |
os.makedirs(save_dir, exist_ok=True) | |
indices = tf.random.uniform( | |
shape=[16], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32) | |
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([16], self.timesteps-1)) | |
samples0 = samples | |
time_steps = np.arange(self.data.shape[1]) | |
self._plot_step_grid(x_0.numpy(), self.timesteps, time_steps, save_dir) | |
print(f"Denoising: Generate 16 samples for visualization") | |
for i in reversed(range(0, self.timesteps)): | |
print(f"Processing step {i}/{self.timesteps}") | |
tt = tf.fill([16], i) | |
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=1 | |
) | |
samples = self.gdf_util.p_sample( | |
pred_noise, samples0, tt, clip_denoised=False | |
) | |
scaler = TSFeatureScaler() | |
scaled_samples = scaler.fit_transform(samples.numpy()) | |
self._plot_step_grid(scaled_samples, i, time_steps, save_dir) | |
print(f"Saved {self.timesteps + 1} plots to {save_dir}/") | |
def _plot_step_grid(self, samples, step, time_steps, save_dir): | |
fig, axes = plt.subplots(4, 4, figsize=(20, 20)) | |
fig.suptitle(f'Generated Samples at Step {step}', fontsize=16) | |
for idx in range(16): | |
row = idx // 4 | |
col = idx % 4 | |
for feature_idx in range(samples.shape[-1]): | |
axes[row, col].plot( | |
time_steps, | |
samples[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
alpha=0.8 | |
) | |
axes[row, col].set_title(f'Sample {idx+1}') | |
axes[row, col].grid(True) | |
if idx % 4 == 0: | |
axes[row, col].set_ylabel('Value') | |
if idx >= 12: | |
axes[row, col].set_xlabel('Time Steps') | |
if idx == 15: | |
axes[row, col].legend() | |
plt.tight_layout() | |
plt.savefig(os.path.join(save_dir, f'step_{step:04d}.png'), | |
dpi=300, bbox_inches='tight') | |
plt.close() | |
def plot_denoise_detailed_process(self, save_dir='denoise_detailed_process'): | |
if os.path.exists(save_dir): | |
shutil.rmtree(save_dir) | |
os.makedirs(save_dir, exist_ok=True) | |
indices = tf.random.uniform( | |
shape=[8], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32) | |
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([8], self.timesteps-1)) | |
samples0 = samples | |
self._plot_step_comparison( | |
x_0.numpy(), | |
x_0.numpy(), | |
-1, | |
time_steps=np.arange(self.data.shape[1]), | |
save_dir=save_dir, | |
title="Original Data" | |
) | |
self._plot_step_comparison( | |
samples.numpy(), | |
samples.numpy(), | |
self.timesteps, | |
time_steps=np.arange(self.data.shape[1]), | |
save_dir=save_dir, | |
title="Initial Noisy Data" | |
) | |
for i in reversed(range(0, self.timesteps)): | |
print(f"Processing step {i}/{self.timesteps}") | |
tt = tf.fill([8], i) | |
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=8) | |
samples = self.gdf_util.p_sample(pred_noise, samples0, tt, clip_denoised=False) | |
scaler = TSFeatureScaler() | |
self._plot_step_comparison( | |
pred_noise, | |
scaler.fit_transform(samples.numpy()), | |
i, | |
time_steps=np.arange(self.data.shape[1]), | |
save_dir=save_dir, | |
title=f"Denoising Step {i}" | |
) | |
print(f"Visualization completed. Check {save_dir}/ for results.") | |
def _plot_step_comparison(self, noise_data, sample_data, step, time_steps, save_dir, title): | |
fig, axes = plt.subplots(2, 8, figsize=(24, 8)) | |
fig.suptitle(f'{title}', fontsize=16) | |
for idx in range(8): | |
ax = axes[0, idx] | |
for feature_idx in range(noise_data.shape[-1]): | |
ax.plot( | |
time_steps, | |
noise_data[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
alpha=0.8 | |
) | |
ax.set_title(f'Noise {idx+1}') | |
ax.grid(True) | |
if idx == 0: | |
ax.set_ylabel('Value') | |
if idx == 7: | |
ax.legend() | |
for idx in range(8): | |
ax = axes[1, idx] | |
for feature_idx in range(sample_data.shape[-1]): | |
ax.plot( | |
time_steps, | |
sample_data[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
alpha=0.8 | |
) | |
ax.set_title(f'Sample {idx+1}') | |
ax.grid(True) | |
if idx == 0: | |
ax.set_ylabel('Value') | |
ax.set_xlabel('Time Steps') | |
plt.tight_layout() | |
plt.savefig( | |
os.path.join(save_dir, f'step_{step:04d}.png'), | |
dpi=300, | |
bbox_inches='tight' | |
) | |
plt.close() | |
def plot_noise_process(self, save_dir='noise_process'): | |
if os.path.exists(save_dir): | |
shutil.rmtree(save_dir) | |
os.makedirs(save_dir, exist_ok=True) | |
indices = tf.random.uniform( | |
shape=[16], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
x_start = tf.cast(tf.gather(self.data, indices), tf.float32) | |
print(f"Noising: Select 16 samples for visualization") | |
for t in range(self.timesteps): | |
print(f"Processing step {t}/{self.timesteps}") | |
_, _, x_noisy = self.gdf_util.q_sample( | |
x_start, | |
tf.fill([16], t)) | |
fig, axes = plt.subplots(4, 4, figsize=(20, 20)) | |
fig.suptitle(f'Noise Process at Step {t}', fontsize=16) | |
time_steps = np.arange(x_start.shape[1]) | |
for idx in range(16): | |
row = idx // 4 | |
col = idx % 4 | |
for feature_idx in range(x_noisy.shape[-1]): | |
axes[row, col].plot( | |
time_steps, | |
x_noisy[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
alpha=0.8 | |
) | |
axes[row, col].set_title(f'Sample {idx+1}') | |
axes[row, col].grid(True) | |
if idx % 4 == 0: | |
axes[row, col].set_ylabel('Value') | |
if idx >= 12: | |
axes[row, col].set_xlabel('Time Steps') | |
if idx == 15: | |
axes[row, col].legend() | |
plt.tight_layout() | |
plt.savefig(os.path.join(save_dir, f'step_{t:04d}.png'), | |
dpi=300, bbox_inches='tight') | |
plt.close() | |
print(f"Saved {self.timesteps} plots to {save_dir}/") | |
def plot_noise_process_app(self, num_samples=16): | |
"""为应用程序创建加噪过程的动态可视化""" | |
# 验证num_samples是否为有效值 | |
valid_sizes = [4, 9, 16, 25] | |
if num_samples not in valid_sizes: | |
raise ValueError(f"num_samples must be one of {valid_sizes}") | |
# 计算网格大小 | |
grid_size = int(np.sqrt(num_samples)) | |
# 随机选择样本 | |
indices = tf.random.uniform( | |
shape=[num_samples], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
x_start = tf.cast(tf.gather(self.data, indices), tf.float32) | |
# 存储每一步的图像 | |
frames = [] | |
time_steps = np.arange(x_start.shape[1]) | |
# 逐步添加噪声 | |
for t in range(0, self.timesteps, max(1, self.timesteps // 10)): | |
# 添加噪声 | |
_, _, x_noisy = self.gdf_util.q_sample( | |
x_start, | |
tf.fill([num_samples], t) | |
) | |
# 创建图形 | |
fig, axes = plt.subplots(grid_size, grid_size, figsize=(5*grid_size, 5*grid_size)) | |
fig.suptitle(f'Noise Process at Step {t}', fontsize=16) | |
colors = ['#0a9396', '#ee9b00', '#9b2226'] | |
# 确保axes是二维数组 | |
if grid_size == 2: | |
axes = axes.reshape(2, 2) | |
# 绘制每个样本 | |
for idx in range(num_samples): | |
row = idx // grid_size | |
col = idx % grid_size | |
for feature_idx in range(x_noisy.shape[-1]): | |
axes[row, col].plot( | |
time_steps, | |
x_noisy[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
color=colors[feature_idx], | |
alpha=0.8 | |
) | |
axes[row, col].set_title(f'Sample {idx+1}') | |
axes[row, col].grid(True) | |
if idx % grid_size == 0: | |
axes[row, col].set_ylabel('Value') | |
if idx >= num_samples - grid_size: | |
axes[row, col].set_xlabel('Time Steps') | |
if idx == num_samples - 1: | |
axes[row, col].legend() | |
plt.tight_layout() | |
# 将图形转换为图像 | |
fig.canvas.draw() | |
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) | |
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
frames.append(image) | |
plt.close() | |
return frames | |
def plot_denoise_process_app(self, num_samples=16): | |
"""为应用程序创建去噪过程的动态可视化""" | |
# 验证num_samples是否为有效值 | |
valid_sizes = [4, 9, 16, 25] | |
if num_samples not in valid_sizes: | |
raise ValueError(f"num_samples must be one of {valid_sizes}") | |
# 计算网格大小 | |
grid_size = int(np.sqrt(num_samples)) | |
# 随机选择样本 | |
indices = tf.random.uniform( | |
shape=[num_samples], | |
minval=0, | |
maxval=len(self.data), | |
dtype=tf.int32 | |
) | |
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32) | |
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([num_samples], self.timesteps-1)) | |
samples0 = samples | |
# 存储每一步的图像 | |
frames = [] | |
time_steps = np.arange(self.data.shape[1]) | |
# 添加原始数据的图像 | |
frames.append(self._plot_step_grid_app(x_0.numpy(), self.timesteps, time_steps, grid_size)) | |
# 逐步去噪 | |
for i in reversed(range(0, self.timesteps)): | |
if i % max(1, self.timesteps // 10) == 0: | |
tt = tf.fill([num_samples], i) | |
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=num_samples) | |
samples = self.gdf_util.p_sample(pred_noise, samples0, tt, clip_denoised=False) | |
scaler = TSFeatureScaler() | |
scaled_samples = scaler.fit_transform(samples.numpy()) | |
frames.append(self._plot_step_grid_app(scaled_samples, i, time_steps, grid_size)) | |
return frames | |
def _plot_step_grid_app(self, samples, step, time_steps, grid_size): | |
"""辅助函数:为应用程序创建单个时间步的网格图""" | |
fig, axes = plt.subplots(grid_size, grid_size, figsize=(5*grid_size, 5*grid_size)) | |
fig.suptitle(f'Generated Samples at Step {step}', fontsize=16) | |
colors = ['#0a9396', '#ee9b00', '#9b2226'] | |
# 确保axes是二维数组 | |
if grid_size == 2: | |
axes = axes.reshape(2, 2) | |
# 绘制每个样本 | |
num_samples = grid_size * grid_size | |
for idx in range(num_samples): | |
row = idx // grid_size | |
col = idx % grid_size | |
for feature_idx in range(samples.shape[-1]): | |
axes[row, col].plot( | |
time_steps, | |
samples[idx, :, feature_idx], | |
label=f'Feature {feature_idx+1}', | |
color=colors[feature_idx], | |
alpha=0.8 | |
) | |
axes[row, col].set_title(f'Sample {idx+1}') | |
axes[row, col].grid(True) | |
if idx % grid_size == 0: | |
axes[row, col].set_ylabel('Value') | |
if idx >= num_samples - grid_size: | |
axes[row, col].set_xlabel('Time Steps') | |
if idx == num_samples - 1: | |
axes[row, col].legend() | |
plt.tight_layout() | |
# 将图形转换为图像 | |
fig.canvas.draw() | |
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) | |
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
plt.close() | |
return image | |