synls / GAN /diffusion.py
danlin1128's picture
Update GAN/diffusion.py
9922e80 verified
raw
history blame
36.3 kB
import math
import shutil
import numpy as np
from tqdm.auto import tqdm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Layer
from GAN.utils import linear_beta_schedule, cosine_beta_schedule
import matplotlib.pyplot as plt
import os
from GAN.utils import TSFeatureScaler
class GaussianDiffusion:
def __init__(
self,
beta_schedule='cosine',
timesteps=10,
clip_min=-1.0,
clip_max=1.0,
):
self.timesteps = timesteps
self.clip_min = clip_min
self.clip_max = clip_max
if beta_schedule == 'linear':
betas = linear_beta_schedule(timesteps)
elif beta_schedule == 'cosine':
betas = cosine_beta_schedule(timesteps)
else:
raise ValueError(f'unknown beta schedule {beta_schedule}')
alphas = 1. - betas
alphas_cumprod = np.cumprod(alphas, axis=0)
alphas_cumprod_prev = np.append(1., alphas_cumprod[:-1])
self.betas = tf.constant(betas, dtype=tf.float32)
self.alphas_cumprod = tf.constant(alphas_cumprod, dtype=tf.float32)
self.alphas_cumprod_prev = tf.constant(alphas_cumprod_prev, dtype=tf.float32)
self.sqrt_recip_alphas = tf.constant(np.sqrt(1. / alphas), dtype=tf.float32)
self.sqrt_alphas_cumprod = tf.constant(np.sqrt(self.alphas_cumprod), dtype=tf.float32)
self.sqrt_one_minus_alphas_cumprod = tf.constant(np.sqrt(1.0 - self.alphas_cumprod), dtype=tf.float32)
self.log_one_minus_alphas_cumprod = tf.constant(np.log(1. - alphas_cumprod), dtype=tf.float32)
self.sqrt_recip_alphas_cumprod = tf.constant(np.sqrt(1. / alphas_cumprod), dtype=tf.float32)
self.sqrt_recipm1_alphas_cumprod = tf.constant(np.sqrt(1.0 / alphas_cumprod - 1), dtype=tf.float32)
self.posterior_variance = (betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod))
self.posterior_log_variance_clipped = tf.constant(
np.log(np.maximum(self.posterior_variance, 1e-20)), dtype=tf.float32
)
self.posterior_mean_coef1 = tf.constant(
betas * np.sqrt(alphas_cumprod_prev) / (1.0 - alphas_cumprod),
dtype=tf.float32,
)
self.posterior_mean_coef2 = tf.constant(
(1.0 - alphas_cumprod_prev) * np.sqrt(alphas) / (1.0 - alphas_cumprod),
dtype=tf.float32,
)
def _extract(self, a, t, x_shape):
batch_size = x_shape[0]
out = tf.gather(a, t)
return tf.reshape(out, [batch_size, 1, 1])
def q_sample(self, x_start, t):
x_start_shape = tf.shape(x_start)
samp = self._extract(self.sqrt_alphas_cumprod, t, x_start_shape) * x_start
noise = tf.random.normal(shape=tf.shape(x_start), dtype='float32')
weight_noise = self._extract(self.sqrt_one_minus_alphas_cumprod, t, x_start_shape) * noise * 0.5
# diffused_sample = self._extract(self.sqrt_alphas_cumprod, t, x_start_shape) * x_start + self._extract(
# self.sqrt_one_minus_alphas_cumprod, t, x_start_shape) * noise #* 0.1
diffused_sample = x_start + weight_noise
diffused_sample = tf.clip_by_value(diffused_sample , -0.99, 0.99)
weight_noise = diffused_sample - x_start
return samp, weight_noise, diffused_sample
def predict_start_from_noise(self, x_t, t, noise):
x_t_shape = tf.shape(x_t)
return (
self._extract(self.sqrt_recip_alphas_cumprod, t, x_t_shape) * x_t
- self._extract(self.sqrt_recipm1_alphas_cumprod, t, x_t_shape) * noise
)
def q_posterior(self, x_start, x_t, t):
x_t_shape = tf.shape(x_t)
posterior_mean = (
self._extract(self.posterior_mean_coef1, t, x_t_shape) * x_start
+ self._extract(self.posterior_mean_coef2, t, x_t_shape) * x_t
)
posterior_variance = self._extract(self.posterior_variance, t, x_t_shape)
posterior_log_variance_clipped = self._extract(
self.posterior_log_variance_clipped, t, x_t_shape
)
return posterior_mean, posterior_variance, posterior_log_variance_clipped
def p_mean_variance(self, pred_noise, x, t, clip_denoised=False):
x_recon = self.predict_start_from_noise(x, t=t, noise=pred_noise)
if clip_denoised:
x_recon = tf.clip_by_value(x_recon, self.clip_min, self.clip_max)
model_mean, posterior_variance, posterior_log_variance = self.q_posterior(
x_start=x_recon, x_t=x, t=t
)
return model_mean, posterior_variance, posterior_log_variance
def p_sample(self, pred_noise, x, t, clip_denoised=False):
model_mean, _, model_log_variance = self.p_mean_variance(
pred_noise, x=x, t=t, clip_denoised=clip_denoised
)
variance_term = tf.exp(0.5 * model_log_variance)
noise = tf.random.normal(shape=tf.shape(x), dtype=x.dtype)
nonzero_mask = tf.reshape(
1 - tf.cast(tf.equal(t, 0), tf.float32), [tf.shape(x)[0], 1, 1]
)
noise_term = variance_term * nonzero_mask * noise
sample = model_mean + noise_term
return sample
class TimeEmbedding(layers.Layer):
def __init__(self, dim, **kwargs):
super().__init__(**kwargs)
self.dim = dim
self.half_dim = dim // 2
self.emb = math.log(10000) / (self.half_dim - 1)
self.emb = tf.exp(tf.range(self.half_dim, dtype=tf.float32) * -self.emb)
def call(self, inputs):
inputs = tf.cast(inputs, dtype=tf.float32)
emb = inputs[:, None] * self.emb[None, :]
emb = tf.concat([tf.sin(emb), tf.cos(emb)], axis=-1)
return emb
def TimeMLP(units, activation_fn=keras.activations.swish):
def apply(inputs):
temb = layers.Dense(
units, activation=activation_fn, kernel_initializer=kernel_init(1.0)
)(inputs)
# temb = layers.Dense(units, kernel_initializer=kernel_init(1.0))(temb)
return temb
return apply
# 创建一个新的层来包装时间嵌入操作
class TimeEmbeddingLayer(Layer):
def __init__(self, d_model, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
def call(self, timesteps):
# 扩展维度
timesteps = tf.expand_dims(timesteps, -1)
# 计算不同频率的正弦信号
dim = self.d_model // 2
inv_freq = 1.0 / (10000 ** (tf.range(0, dim, dtype=tf.float32) / dim))
# 计算正弦波
sinusoid_inp = tf.einsum("i,j->ij", tf.squeeze(timesteps), inv_freq)
pos_emb = tf.concat([tf.sin(sinusoid_inp), tf.cos(sinusoid_inp)], axis=-1)
return pos_emb
# Kernel initializer to use
def kernel_init(scale):
scale = max(scale, 1e-10)
return keras.initializers.VarianceScaling(
scale, mode="fan_avg", distribution="uniform"
)
def build_encoder_time(embed_dim=16, num_heads=2, ff_dim=32):
def apply(inputs):
x, t = inputs
position_embedding_layer = layers.Embedding(x.shape[1], embed_dim)
pos_encoding = position_embedding_layer(tf.range(x.shape[1]))
embeddings = x + pos_encoding + t
# Encoder blocks
for _ in range(2): # Repeat twice
# Multi-head self-attention mechanism
attention_output, attention_score = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(
embeddings, embeddings, return_attention_scores=True)
# Add residual connection and layer normalization
x = layers.Add()([embeddings, attention_output])
x = layers.LayerNormalization(epsilon=1e-6)(x)
# Feed-forward network
ff_output = layers.Dense(ff_dim, activation="relu")(x)
ff_output = layers.Dense(embed_dim)(ff_output)
# Add residual connection and layer normalization
x = layers.Add()([x, ff_output])
x = layers.LayerNormalization(epsilon=1e-6)(x)
return x, attention_score
return apply
def build_encoder_variales(embed_dim=16, num_heads=2, ff_dim=32):
def apply(inputs):
x, t = inputs
x = layers.Conv1D(16, kernel_size=3, padding='same')(x)
embeddings = x + t
# Encoder blocks
for _ in range(2): # Repeat twice
# Multi-head self-attention mechanism
attention_output, attention_score = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(
embeddings, embeddings, return_attention_scores=True)
# Add residual connection and layer normalization
x = layers.Add()([embeddings, attention_output])
x = layers.LayerNormalization(epsilon=1e-6)(x)
# Feed-forward network
ff_output = layers.Dense(ff_dim, activation="relu")(x)
ff_output = layers.Dense(embed_dim)(ff_output)
# Add residual connection and layer normalization
x = layers.Add()([x, ff_output])
x = layers.LayerNormalization(epsilon=1e-6)(x)
return x, attention_score
return apply
def build_decoder(embed_dim=16, num_heads=2, ff_dim=32):
def apply(inputs):
encoder_outputs, t = inputs
position_embedding_layer = layers.Embedding(encoder_outputs.shape[1], embed_dim)
pos_encoding = position_embedding_layer(tf.range(encoder_outputs.shape[1]))
dec_embeddings = encoder_outputs + pos_encoding + t
# Decoder blocks
dec_output = dec_embeddings
for _ in range(2): # Repeat twice
# Multi-head attention over encoder outputs
attention2_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(
dec_output, encoder_outputs)
# Add residual connection and layer normalization
dec_output = layers.Add()([dec_output, attention2_output])
dec_output = layers.LayerNormalization(epsilon=1e-6)(dec_output)
# Feed-forward network
ff_output = layers.Dense(ff_dim, activation="relu")(dec_output)
ff_output = layers.Dense(embed_dim)(ff_output)
# Add residual connection and layer normalization
dec_output = layers.Add()([dec_output, ff_output])
dec_output = layers.LayerNormalization(epsilon=1e-6)(dec_output)
return dec_output
return apply
def build_model(time_len, fea_num, d_model=16, n_heads=2, encoder_type='dual'):
"""
Build the transformer-based diffusion model.
"""
print(f"\nBuilding model with encoder type: {encoder_type}")
print(f"Input shape: time_len={time_len}, features={fea_num}, d_model={d_model}")
# Input layers
x_input = layers.Input(shape=(time_len, fea_num))
#time_input = layers.Input(shape=())
#time_emb = get_time_embedding(time_input, d_model)
time_input = Input(shape=(1,), name='time_input')
feature_input = Input(shape=(time_len, fea_num), name='feature_input')
time_emb = TimeEmbeddingLayer(d_model)(time_input)
encoded_features = []
if encoder_type in ['time', 'dual']:
print("→ Using Time Transformer Encoder")
# Time Transformer
time_encoded = time_transformer_encoder(
x_input,
time_emb,
d_model=d_model,
n_heads=n_heads
)
print(f" Time encoder output shape: {time_encoded.shape}")
encoded_features.append(time_encoded)
if encoder_type in ['pairwise', 'dual']:
print("→ Using Pairwise Correlation Encoder")
# Pairwise Correlation Transformer
pairwise_encoded = pairwise_transformer_encoder(
x_input,
time_emb,
d_model=d_model,
n_heads=n_heads
)
print(f" Pairwise encoder output shape: {pairwise_encoded.shape}")
encoded_features.append(pairwise_encoded)
# Combine encodings based on encoder type
if encoder_type == 'dual':
print("→ Combining both encoders")
encoded = layers.Concatenate(axis=-1)(encoded_features)
print(f" Combined shape before projection: {encoded.shape}")
encoded = layers.Dense(d_model)(encoded)
print(f" Final encoded shape after projection: {encoded.shape}")
else:
encoded = encoded_features[0]
print(f"→ Using single encoder output shape: {encoded.shape}")
# Add residual connection
if encoder_type != 'dual':
print("→ Adding residual connection")
encoded = layers.Add()([encoded, layers.Dense(d_model)(x_input)])
# Decoder
decoded = decoder_module(encoded, time_emb)
print(f"→ Decoder output shape: {decoded.shape}")
# Final output layer
output = layers.Dense(fea_num)(decoded)
print(f"→ Final output shape: {output.shape}")
print("Model building completed!\n")
return keras.Model(inputs=[x_input, time_input], outputs=output)
def pairwise_transformer_encoder(x, time_emb, d_model=16, n_heads=2):
"""Pairwise Correlation Transformer encoder implementation"""
# Get input shape
input_shape = x.shape
time_len = input_shape[1]
# Transpose input to treat features as sequence
x_transposed = tf.transpose(x, perm=[0, 2, 1]) # [batch, features, time]
# Project input
x_proj = layers.Dense(d_model)(x_transposed)
# Expand time embeddings
time_emb = tf.expand_dims(time_emb, 1)
time_emb = tf.tile(time_emb, [1, tf.shape(x_proj)[1], 1])
# Add time embeddings
x = x_proj + time_emb
# Transformer encoder layers
for _ in range(2):
x = transformer_encoder_layer(x, d_model, n_heads)
# Project to correct time dimension and transpose back
x = layers.Dense(time_len)(x) # Project to original time dimension
x = tf.transpose(x, perm=[0, 2, 1]) # [batch, time, features]
# Final projection to match d_model dimension
x = layers.Dense(d_model)(x) # [batch, time_len, d_model]
return x
def time_transformer_encoder(x, time_emb, d_model=16, n_heads=2):
"""Time Transformer encoder implementation"""
# Position embeddings
pos_emb = get_positional_embedding(tf.shape(x)[1], d_model)
# Project input
x = layers.Dense(d_model)(x)
# Expand time embeddings
time_emb = tf.expand_dims(time_emb, 1)
time_emb = tf.tile(time_emb, [1, tf.shape(x)[1], 1])
# Add embeddings
x = x + pos_emb + time_emb
# Transformer encoder layers
for _ in range(2):
x = transformer_encoder_layer(x, d_model, n_heads)
return x # [batch, time_len, d_model]
def transformer_encoder_layer(x, d_model, n_heads):
"""Single transformer encoder layer with multi-head attention"""
# Multi-head self attention
attention_output = layers.MultiHeadAttention(
num_heads=n_heads,
key_dim=d_model // n_heads
)(x, x)
# Add & Norm
x = layers.Add()([x, attention_output])
x = layers.LayerNormalization()(x)
# Feed Forward Network
ffn_output = layers.Dense(d_model * 4, activation='relu')(x)
ffn_output = layers.Dense(d_model)(ffn_output)
# Add & Norm
x = layers.Add()([x, ffn_output])
x = layers.LayerNormalization()(x)
return x
def decoder_module(encoded, time_emb):
"""Decoder implementation"""
# Expand time embeddings
time_emb = tf.expand_dims(time_emb, 1)
time_emb = tf.tile(time_emb, [1, tf.shape(encoded)[1], 1])
# Concatenate along feature dimension
x = layers.Concatenate(axis=-1)([encoded, time_emb])
# Decoder layers
x = layers.Dense(256, activation='relu')(x)
x = layers.Dense(128, activation='relu')(x)
return x
def get_time_embedding(timesteps, embedding_dim):
"""
Create sinusoidal time embeddings.
Args:
timesteps: Tensor of shape [batch_size] containing timesteps
embedding_dim: Dimension of the embeddings to create
"""
# Ensure timesteps is a 2D tensor
timesteps = tf.expand_dims(timesteps, -1)
# Calculate positions and dimensions
half_dim = embedding_dim // 2
emb = math.log(10000) / (half_dim - 1)
emb = tf.exp(tf.range(half_dim, dtype=tf.float32) * -emb)
# Create embeddings
emb = tf.cast(timesteps, dtype=tf.float32) * emb[None, :]
emb = tf.concat([tf.sin(emb), tf.cos(emb)], axis=-1)
# Handle odd embedding dimensions
if embedding_dim % 2 == 1:
emb = tf.pad(emb, [[0, 0], [0, 1]])
return emb # Shape: [batch_size, embedding_dim]
def get_positional_embedding(sequence_length, embedding_dim):
"""
Create sinusoidal position embeddings.
Args:
sequence_length: Length of the sequence
embedding_dim: Dimension of the embeddings to create
"""
# Create position indices
positions = tf.range(sequence_length, dtype=tf.float32)[:, tf.newaxis]
# Create dimension indices
dimensions = tf.range(0, embedding_dim, 2, dtype=tf.float32)[tf.newaxis, :]
# Calculate angle rates
angle_rates = 1 / tf.pow(10000.0, (2 * dimensions) / tf.cast(embedding_dim, tf.float32))
# Calculate angle rads
angle_rads = positions * angle_rates
# Apply sin and cos
pos_encoding = tf.concat(
[tf.sin(angle_rads), tf.cos(angle_rads)],
axis=-1
)
# Handle odd embedding dimensions
if embedding_dim % 2 == 1:
pos_encoding = tf.pad(pos_encoding, [[0, 0], [0, 1]])
# Add batch dimension
pos_encoding = tf.expand_dims(pos_encoding, 0)
return pos_encoding
class DiffusionModel(keras.Model):
def __init__(self, network, ema_network, timesteps, gdf_util, data, ema=0.999):
super().__init__()
self.network = network
self.ema_network = ema_network
self.timesteps = timesteps
self.gdf_util = gdf_util
self.data = data
self.ema = ema
def train_step(self, data):
batch_size = tf.shape(data)[0]
t = tf.random.uniform(
minval=0,
maxval=self.timesteps,
shape=(batch_size,),
dtype=tf.int32
)
old_weights = [tf.identity(w) for w in self.network.trainable_weights]
with tf.GradientTape() as tape:
_, noise, x_t = self.gdf_util.q_sample(data, t)
pred_noise = self.network([x_t, t], training=True)
loss = self.loss(noise, pred_noise)
gradients = tape.gradient(loss, self.network.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
ema_weight.assign(self.ema * ema_weight + (1 - self.ema) * weight)
new_weights = self.network.trainable_weights
weight_changes = []
for old_w, new_w in zip(old_weights, new_weights):
diff = tf.reduce_max(tf.abs(old_w - new_w))
weight_changes.append(diff)
max_change = tf.reduce_max(weight_changes)
return {
"loss": loss,
"weight_max_change": max_change,
"has_weight_changed": max_change > 0
}
def check_noise_levels(self):
x_0 = tf.cast(self.data, tf.float32)
print("\n=== Noise Level Analysis ===")
print(f"Using all {len(self.data)} samples")
print("Checking noise levels at different timesteps:")
timesteps_to_check = [
0,
self.timesteps//4,
self.timesteps//2,
3*self.timesteps//4,
self.timesteps-1
]
for t in timesteps_to_check:
sqrt_alphas = tf.sqrt(self.gdf_util.alphas_cumprod[t])
sqrt_one_minus_alphas = tf.sqrt(1 - self.gdf_util.alphas_cumprod[t])
_, noise, x_t = self.gdf_util.q_sample(x_0, tf.fill([len(x_0)], t))
pred_noise = self.ema_network.predict([x_t, tf.fill([len(x_0)], t)], verbose=0)
print(f"\nTimestep {t}:")
print(f"Signal scaling factor (sqrt_alphas): {sqrt_alphas:.4f}")
print(f"Noise scaling factor (sqrt_1-alphas): {sqrt_one_minus_alphas:.4f}")
print(f"Original data range: [{tf.reduce_min(x_0):.4f}, {tf.reduce_max(x_0):.4f}]")
print(f"Noisy data range: [{tf.reduce_min(x_t):.4f}, {tf.reduce_max(x_t):.4f}]")
print(f"Added noise - Mean: {tf.reduce_mean(noise):.4f}, Std: {tf.math.reduce_std(noise):.4f}")
print(f"Scaled noise - Mean: {tf.reduce_mean(sqrt_one_minus_alphas * noise):.4f}, Std: {tf.math.reduce_std(sqrt_one_minus_alphas * noise):.4f}")
print(f"Predicted noise - Mean: {tf.reduce_mean(pred_noise):.4f}, Std: {tf.math.reduce_std(pred_noise):.4f}")
print(f"Noise prediction error: {tf.reduce_mean(tf.abs(noise - pred_noise)):.4f}")
def generate_ts(self, num_ts=16):
if num_ts > len(self.data):
indices = tf.random.uniform(
shape=[num_ts],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
initial_samples = tf.cast(
tf.gather(self.data, indices),
tf.float32
)
else:
initial_samples = self.data
_, _, samples = self.gdf_util.q_sample(initial_samples, tf.fill([num_ts], self.timesteps-1))
samples0 = samples
for i in reversed(range(0, self.timesteps)):
tt = tf.fill([num_ts], i)
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=num_ts
)
# print(f"\nStep {i}:")
# print(f"Predicted noise - Mean: {tf.reduce_mean(pred_noise):.4f}")
# print(f"Predicted noise - Std: {tf.math.reduce_std(pred_noise):.4f}")
# print(f"Predicted noise - Min: {tf.reduce_min(pred_noise):.4f}")
# print(f"Predicted noise - Max: {tf.reduce_max(pred_noise):.4f}")
samples = self.gdf_util.p_sample(
pred_noise, samples0, tt, clip_denoised=False
)
# scaler = TSFeatureScaler()
# print(f"Generated samples - Mean: {tf.reduce_mean(scaler.fit_transform(samples.numpy())):.4f}")
# print(f"Generated samples- Std: {tf.math.reduce_std(scaler.fit_transform(samples.numpy())):.4f}")
# print(f"Generated samples - Min: {tf.reduce_min(scaler.fit_transform(samples.numpy())):.4f}")
# print(f"Generated samples - Max: {tf.reduce_max(scaler.fit_transform(samples.numpy())):.4f}")
return samples
def plot_denoise_process(self, save_dir='denoise_process'):
if os.path.exists(save_dir):
shutil.rmtree(save_dir)
os.makedirs(save_dir, exist_ok=True)
indices = tf.random.uniform(
shape=[16],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32)
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([16], self.timesteps-1))
samples0 = samples
time_steps = np.arange(self.data.shape[1])
self._plot_step_grid(x_0.numpy(), self.timesteps, time_steps, save_dir)
print(f"Denoising: Generate 16 samples for visualization")
for i in reversed(range(0, self.timesteps)):
print(f"Processing step {i}/{self.timesteps}")
tt = tf.fill([16], i)
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=1
)
samples = self.gdf_util.p_sample(
pred_noise, samples0, tt, clip_denoised=False
)
scaler = TSFeatureScaler()
scaled_samples = scaler.fit_transform(samples.numpy())
self._plot_step_grid(scaled_samples, i, time_steps, save_dir)
print(f"Saved {self.timesteps + 1} plots to {save_dir}/")
def _plot_step_grid(self, samples, step, time_steps, save_dir):
fig, axes = plt.subplots(4, 4, figsize=(20, 20))
fig.suptitle(f'Generated Samples at Step {step}', fontsize=16)
for idx in range(16):
row = idx // 4
col = idx % 4
for feature_idx in range(samples.shape[-1]):
axes[row, col].plot(
time_steps,
samples[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
alpha=0.8
)
axes[row, col].set_title(f'Sample {idx+1}')
axes[row, col].grid(True)
if idx % 4 == 0:
axes[row, col].set_ylabel('Value')
if idx >= 12:
axes[row, col].set_xlabel('Time Steps')
if idx == 15:
axes[row, col].legend()
plt.tight_layout()
plt.savefig(os.path.join(save_dir, f'step_{step:04d}.png'),
dpi=300, bbox_inches='tight')
plt.close()
def plot_denoise_detailed_process(self, save_dir='denoise_detailed_process'):
if os.path.exists(save_dir):
shutil.rmtree(save_dir)
os.makedirs(save_dir, exist_ok=True)
indices = tf.random.uniform(
shape=[8],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32)
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([8], self.timesteps-1))
samples0 = samples
self._plot_step_comparison(
x_0.numpy(),
x_0.numpy(),
-1,
time_steps=np.arange(self.data.shape[1]),
save_dir=save_dir,
title="Original Data"
)
self._plot_step_comparison(
samples.numpy(),
samples.numpy(),
self.timesteps,
time_steps=np.arange(self.data.shape[1]),
save_dir=save_dir,
title="Initial Noisy Data"
)
for i in reversed(range(0, self.timesteps)):
print(f"Processing step {i}/{self.timesteps}")
tt = tf.fill([8], i)
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=8)
samples = self.gdf_util.p_sample(pred_noise, samples0, tt, clip_denoised=False)
scaler = TSFeatureScaler()
self._plot_step_comparison(
pred_noise,
scaler.fit_transform(samples.numpy()),
i,
time_steps=np.arange(self.data.shape[1]),
save_dir=save_dir,
title=f"Denoising Step {i}"
)
print(f"Visualization completed. Check {save_dir}/ for results.")
def _plot_step_comparison(self, noise_data, sample_data, step, time_steps, save_dir, title):
fig, axes = plt.subplots(2, 8, figsize=(24, 8))
fig.suptitle(f'{title}', fontsize=16)
for idx in range(8):
ax = axes[0, idx]
for feature_idx in range(noise_data.shape[-1]):
ax.plot(
time_steps,
noise_data[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
alpha=0.8
)
ax.set_title(f'Noise {idx+1}')
ax.grid(True)
if idx == 0:
ax.set_ylabel('Value')
if idx == 7:
ax.legend()
for idx in range(8):
ax = axes[1, idx]
for feature_idx in range(sample_data.shape[-1]):
ax.plot(
time_steps,
sample_data[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
alpha=0.8
)
ax.set_title(f'Sample {idx+1}')
ax.grid(True)
if idx == 0:
ax.set_ylabel('Value')
ax.set_xlabel('Time Steps')
plt.tight_layout()
plt.savefig(
os.path.join(save_dir, f'step_{step:04d}.png'),
dpi=300,
bbox_inches='tight'
)
plt.close()
def plot_noise_process(self, save_dir='noise_process'):
if os.path.exists(save_dir):
shutil.rmtree(save_dir)
os.makedirs(save_dir, exist_ok=True)
indices = tf.random.uniform(
shape=[16],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
x_start = tf.cast(tf.gather(self.data, indices), tf.float32)
print(f"Noising: Select 16 samples for visualization")
for t in range(self.timesteps):
print(f"Processing step {t}/{self.timesteps}")
_, _, x_noisy = self.gdf_util.q_sample(
x_start,
tf.fill([16], t))
fig, axes = plt.subplots(4, 4, figsize=(20, 20))
fig.suptitle(f'Noise Process at Step {t}', fontsize=16)
time_steps = np.arange(x_start.shape[1])
for idx in range(16):
row = idx // 4
col = idx % 4
for feature_idx in range(x_noisy.shape[-1]):
axes[row, col].plot(
time_steps,
x_noisy[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
alpha=0.8
)
axes[row, col].set_title(f'Sample {idx+1}')
axes[row, col].grid(True)
if idx % 4 == 0:
axes[row, col].set_ylabel('Value')
if idx >= 12:
axes[row, col].set_xlabel('Time Steps')
if idx == 15:
axes[row, col].legend()
plt.tight_layout()
plt.savefig(os.path.join(save_dir, f'step_{t:04d}.png'),
dpi=300, bbox_inches='tight')
plt.close()
print(f"Saved {self.timesteps} plots to {save_dir}/")
def plot_noise_process_app(self, num_samples=16):
"""为应用程序创建加噪过程的动态可视化"""
# 验证num_samples是否为有效值
valid_sizes = [4, 9, 16, 25]
if num_samples not in valid_sizes:
raise ValueError(f"num_samples must be one of {valid_sizes}")
# 计算网格大小
grid_size = int(np.sqrt(num_samples))
# 随机选择样本
indices = tf.random.uniform(
shape=[num_samples],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
x_start = tf.cast(tf.gather(self.data, indices), tf.float32)
# 存储每一步的图像
frames = []
time_steps = np.arange(x_start.shape[1])
# 逐步添加噪声
for t in range(0, self.timesteps, max(1, self.timesteps // 10)):
# 添加噪声
_, _, x_noisy = self.gdf_util.q_sample(
x_start,
tf.fill([num_samples], t)
)
# 创建图形
fig, axes = plt.subplots(grid_size, grid_size, figsize=(5*grid_size, 5*grid_size))
fig.suptitle(f'Noise Process at Step {t}', fontsize=16)
colors = ['#0a9396', '#ee9b00', '#9b2226']
# 确保axes是二维数组
if grid_size == 2:
axes = axes.reshape(2, 2)
# 绘制每个样本
for idx in range(num_samples):
row = idx // grid_size
col = idx % grid_size
for feature_idx in range(x_noisy.shape[-1]):
axes[row, col].plot(
time_steps,
x_noisy[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
color=colors[feature_idx],
alpha=0.8
)
axes[row, col].set_title(f'Sample {idx+1}')
axes[row, col].grid(True)
if idx % grid_size == 0:
axes[row, col].set_ylabel('Value')
if idx >= num_samples - grid_size:
axes[row, col].set_xlabel('Time Steps')
if idx == num_samples - 1:
axes[row, col].legend()
plt.tight_layout()
# 将图形转换为图像
fig.canvas.draw()
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
frames.append(image)
plt.close()
return frames
def plot_denoise_process_app(self, num_samples=16):
"""为应用程序创建去噪过程的动态可视化"""
# 验证num_samples是否为有效值
valid_sizes = [4, 9, 16, 25]
if num_samples not in valid_sizes:
raise ValueError(f"num_samples must be one of {valid_sizes}")
# 计算网格大小
grid_size = int(np.sqrt(num_samples))
# 随机选择样本
indices = tf.random.uniform(
shape=[num_samples],
minval=0,
maxval=len(self.data),
dtype=tf.int32
)
x_0 = tf.cast(tf.gather(self.data, indices), tf.float32)
_, _, samples = self.gdf_util.q_sample(x_0, tf.fill([num_samples], self.timesteps-1))
samples0 = samples
# 存储每一步的图像
frames = []
time_steps = np.arange(self.data.shape[1])
# 添加原始数据的图像
frames.append(self._plot_step_grid_app(x_0.numpy(), self.timesteps, time_steps, grid_size))
# 逐步去噪
for i in reversed(range(0, self.timesteps)):
if i % max(1, self.timesteps // 10) == 0:
tt = tf.fill([num_samples], i)
pred_noise = self.ema_network.predict([samples0, tt], verbose=0, batch_size=num_samples)
samples = self.gdf_util.p_sample(pred_noise, samples0, tt, clip_denoised=False)
scaler = TSFeatureScaler()
scaled_samples = scaler.fit_transform(samples.numpy())
frames.append(self._plot_step_grid_app(scaled_samples, i, time_steps, grid_size))
return frames
def _plot_step_grid_app(self, samples, step, time_steps, grid_size):
"""辅助函数:为应用程序创建单个时间步的网格图"""
fig, axes = plt.subplots(grid_size, grid_size, figsize=(5*grid_size, 5*grid_size))
fig.suptitle(f'Generated Samples at Step {step}', fontsize=16)
colors = ['#0a9396', '#ee9b00', '#9b2226']
# 确保axes是二维数组
if grid_size == 2:
axes = axes.reshape(2, 2)
# 绘制每个样本
num_samples = grid_size * grid_size
for idx in range(num_samples):
row = idx // grid_size
col = idx % grid_size
for feature_idx in range(samples.shape[-1]):
axes[row, col].plot(
time_steps,
samples[idx, :, feature_idx],
label=f'Feature {feature_idx+1}',
color=colors[feature_idx],
alpha=0.8
)
axes[row, col].set_title(f'Sample {idx+1}')
axes[row, col].grid(True)
if idx % grid_size == 0:
axes[row, col].set_ylabel('Value')
if idx >= num_samples - grid_size:
axes[row, col].set_xlabel('Time Steps')
if idx == num_samples - 1:
axes[row, col].legend()
plt.tight_layout()
# 将图形转换为图像
fig.canvas.draw()
image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
plt.close()
return image