Spaces:
Runtime error
Runtime error
import gradio as gr | |
import torch | |
import numpy as np | |
import sd.gradio_utils as gradio_utils | |
import os | |
import cv2 | |
import argparse | |
import ipdb | |
import argparse | |
from tqdm import tqdm | |
from diffusers import DDIMScheduler | |
from diffusers import DDIMScheduler, DDPMScheduler | |
from sd.core import DDIMBackward, DDPM_forward | |
torch.backends.cudnn.enabled = True | |
torch.backends.cudnn.benchmark = True | |
def slerp(R_target, rotation_speed): | |
# Compute the angle of rotation from the rotation matrix | |
angle = np.arccos((np.trace(R_target) - 1) / 2) | |
# Handle the case where angle is very small (no significant rotation) | |
if angle < 1e-6: | |
return np.eye(3) | |
# Normalize the angle based on rotation_speed | |
normalized_angle = angle * rotation_speed | |
# Axis of rotation | |
axis = np.array([R_target[2, 1] - R_target[1, 2], | |
R_target[0, 2] - R_target[2, 0], | |
R_target[1, 0] - R_target[0, 1]]) | |
axis = axis / np.linalg.norm(axis) | |
# Return the interpolated rotation matrix | |
return cv2.Rodrigues(axis * normalized_angle)[0] | |
def compute_extrinsic_parameters(clicked_point, depth, intrinsic_matrix, rotation_speed, step_x=0, step_y=0, step_z=0): | |
# Normalize the clicked point | |
x,y = clicked_point | |
x = int(x) | |
y = int(y) | |
x_normalized = (x - intrinsic_matrix[0, 2]) / intrinsic_matrix[0, 0] | |
y_normalized = (y - intrinsic_matrix[1, 2]) / intrinsic_matrix[1, 1] | |
# Depth at the clicked point | |
try: | |
z = depth[y, x] | |
except Exception: | |
ipdb.set_trace() | |
# Direction vector in camera coordinates | |
direction_vector = np.array([x_normalized * z, y_normalized * z, z]) | |
# Calculate rotation angles to bring the clicked point to the center | |
angle_y = -np.arctan2(direction_vector[1], direction_vector[2]) # Rotation about Y-axis | |
angle_x = np.arctan2(direction_vector[0], direction_vector[2]) # Rotation about X-axis | |
# Apply rotation speed | |
angle_y *= rotation_speed | |
angle_x *= rotation_speed | |
# Compute rotation matrices | |
R_x = cv2.Rodrigues(np.array([1, 0, 0]) * angle_x)[0] | |
R_y = cv2.Rodrigues(np.array([0, 1, 0]) * angle_y)[0] | |
R = R_y @ R_x | |
# Compute rotation matrix to align direction vector with principal axis | |
T = np.array([step_x, -step_y, -step_z]) | |
# Create extrinsic matrix | |
extrinsic_matrix = np.eye(4) | |
extrinsic_matrix[:3, :3] = R | |
extrinsic_matrix[:3, 3] = T | |
return extrinsic_matrix | |
def encode_imgs(imgs): | |
imgs = 2 * imgs - 1 | |
posterior = pipe.vae.encode(imgs).latent_dist | |
latents = posterior.mean * 0.18215 | |
return latents | |
def decode_latents(latents): | |
latents = 1 / 0.18215 * latents | |
imgs = pipe.vae.decode(latents).sample | |
imgs = (imgs / 2 + 0.5).clamp(0, 1) | |
return imgs | |
def ddim_inversion(latent, cond, stop_t=1000, start_t=-1): | |
timesteps = reversed(pipe.scheduler.timesteps) | |
pipe.scheduler.set_timesteps(num_inference_steps) | |
for i, t in enumerate(tqdm(timesteps)): | |
if t >= stop_t: | |
break | |
if t <=start_t: | |
continue | |
cond_batch = cond.repeat(latent.shape[0], 1, 1) | |
alpha_prod_t = pipe.scheduler.alphas_cumprod[t] | |
alpha_prod_t_prev = ( | |
pipe.scheduler.alphas_cumprod[timesteps[i - 1]] | |
if i > 0 else pipe.scheduler.final_alpha_cumprod | |
) | |
mu = alpha_prod_t ** 0.5 | |
mu_prev = alpha_prod_t_prev ** 0.5 | |
sigma = (1 - alpha_prod_t) ** 0.5 | |
sigma_prev = (1 - alpha_prod_t_prev) ** 0.5 | |
eps = pipe.unet(latent, t, encoder_hidden_states=cond_batch).sample | |
pred_x0 = (latent - sigma_prev * eps) / mu_prev | |
latent = mu * pred_x0 + sigma * eps | |
return latent | |
def get_text_embeds(prompt, negative_prompt='', batch_size=1): | |
text_input = pipe.tokenizer(prompt, padding='max_length', max_length=77, truncation=True, return_tensors='pt') | |
text_embeddings = pipe.text_encoder(text_input.input_ids.to(device))[0] | |
uncond_input = pipe.tokenizer(negative_prompt, padding='max_length', max_length=77, truncation=True, return_tensors='pt') | |
uncond_embeddings = pipe.text_encoder(uncond_input.input_ids.to(device))[0] | |
# cat for final embeddings | |
text_embeddings = torch.cat([uncond_embeddings] * batch_size + [text_embeddings] * batch_size).to(torch_dtype) | |
return text_embeddings | |
def save_video(frames, fps=10, out_path='output/output.mp4'): | |
video_dims = (512, 512) | |
fourcc = cv2.VideoWriter_fourcc(*'MP4V') | |
video = cv2.VideoWriter(out_path,fourcc, fps, video_dims) | |
os.makedirs(os.path.dirname(out_path), exist_ok=True) | |
for frame in frames: | |
video.write(cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR)) | |
video.release() | |
def draw_prompt(prompt): | |
return prompt | |
def to_image(tensor): | |
tensor = tensor.squeeze(0).permute(1, 2, 0) | |
arr = tensor.detach().cpu().numpy() | |
arr = (arr - arr.min()) / (arr.max() - arr.min()) | |
arr = arr * 255 | |
return arr.astype('uint8') | |
def add_points_to_image(image, points): | |
image = gradio_utils.draw_handle_target_points(image, points, 5) | |
return image | |
def on_click(state, seed, count, prompt, neg_prompt, speed_r, speed_x, speed_y, speed_z, t1, t2, t3, lr, guidance_weight,attn,threshold, early_stop, evt: gr.SelectData): | |
end_id = int(t1) | |
start_id=int(t2) | |
startstart_id = int(t3) | |
timesteps = reversed(ddim_scheduler.timesteps) | |
end_t = timesteps[end_id] | |
start_t = timesteps[start_id] | |
startstart_t = timesteps[startstart_id] | |
attn=float(attn) | |
cfg_norm=False | |
cfg_decay=False | |
guidance_loss_scale = float(guidance_weight) | |
lr = float(lr) | |
threshold = int(threshold) | |
up_ft_indexes = 2 | |
early_stop = int(early_stop) | |
generator = torch.Generator(device).manual_seed(int(seed)) # 19491001 | |
state['direction_offset'] = [int(evt.index[0]), int(evt.index[1])] | |
cond = pipe._encode_prompt(prompt, device, 1, True, '') | |
for _ in range(int(count)): | |
image = state['img'] | |
img_tensor = torch.from_numpy(np.array(image) / 255.).to(device).to(torch_dtype).permute(2,0,1).unsqueeze(0) | |
_,_,depth = pipe.midas_model(np.array(image)) | |
centered = is_centered(state['direction_offset']) | |
if centered: | |
extrinsic = compute_extrinsic_parameters(state['direction_offset'], depth, intrinsic, rotation_speed=float(0), step_z=float(speed_z), step_x=float(speed_x), step_y=float(speed_y)) | |
state['centered'] = centered | |
else: | |
extrinsic = compute_extrinsic_parameters(state['direction_offset'], depth, intrinsic, rotation_speed=float(speed_r), step_z=float(speed_z), step_x=float(speed_x), step_y=float(speed_y)) | |
this_latent = encode_imgs(img_tensor) | |
this_ddim_inv_noise_end = ddim_inversion(this_latent, cond[1:], stop_t=end_t) | |
this_ddim_inv_noise_start = ddim_inversion(this_latent, cond[1:], stop_t=startstart_t) | |
wrapped_this_ddim_inv_noise_end = pipe.midas_model.wrap_img_tensor_w_fft_ext(this_ddim_inv_noise_end.to(torch_dtype), | |
torch.from_numpy(depth).to(device).to(torch_dtype), | |
intrinsic, | |
extrinsic[:3,:3], extrinsic[:3,3], threshold=threshold).to(torch_dtype) | |
wrapped_this_ddim_inv_noise_start = ddim_inversion(wrapped_this_ddim_inv_noise_end, cond[1:], stop_t=start_t, start_t=end_t,) | |
wrapped_this_ddim_inv_noise_start = DDPM_forward(wrapped_this_ddim_inv_noise_start, t_start=start_t, delta_t=(startstart_id-start_id)*20, | |
ddpm_scheduler=ddpm_scheduler, generator=generator) | |
new_img = pipe.denoise_w_injection( | |
prompt, generator=generator, num_inference_steps=num_inference_steps, | |
latents=torch.cat([this_ddim_inv_noise_start, wrapped_this_ddim_inv_noise_start], dim=0), t_start=startstart_t, | |
latent_mask=torch.ones_like(this_latent[0,0,...], device=device, | |
).unsqueeze(0), | |
f=0, attn=attn, guidance_scale=7.5, negative_prompt=neg_prompt, | |
guidance_loss_scale=guidance_loss_scale, early_stop=early_stop, up_ft_indexes=[up_ft_indexes], | |
cfg_norm=cfg_norm, cfg_decay=cfg_decay, lr=lr, | |
intrinsic=intrinsic, extrinsic=extrinsic, threshold=threshold,depth=depth, | |
).images[1] | |
new_img = np.array(new_img).astype(np.uint8) | |
state['img'] = new_img | |
state['img_his'].append(new_img) | |
depth = (depth - depth.min()) / (depth.max() - depth.min()) * 1. | |
state['depth_his'].append(depth) | |
return new_img, depth, state['img_his'], state | |
def is_centered(clicked_point, image_dimensions=(512, 512), threshold=5): | |
image_center = [dim // 2 for dim in image_dimensions] | |
return all(abs(clicked_point[i] - image_center[i]) <= threshold for i in range(2)) | |
def gen_img(prompt, neg_prompt, state, seed): | |
generator = torch.Generator(device).manual_seed(int(seed)) # 19491001 | |
img = pipe( | |
prompt, generator=generator, num_inference_steps=num_inference_steps, negative_prompt=neg_prompt, | |
).images[0] | |
img_array = np.array(img) | |
_,_,depth = pipe.midas_model(img_array) | |
depth = (depth - depth.min()) / (depth.max() - depth.min()) * 1. | |
state['img_his'] = [img_array] | |
state['depth_his'] = [depth] | |
try: | |
state['ori_img'] = img_array | |
state['img'] = img_array | |
except Exception: | |
ipdb.set_trace() | |
return img_array, depth, [img_array], state | |
def on_undo(state): | |
if len(state['img_his'])>1: | |
del state['img_his'][-1] | |
del state['depth_his'][-1] | |
image = state['img_his'][-1] | |
depth = state['depth_his'][-1] | |
else: | |
image = state['img_his'][-1] | |
depth = state['depth_his'][-1] | |
state['img'] = image | |
return image, depth, state['img_his'], state | |
def on_reset(state): | |
image = state['img_his'][0] | |
depth = state['depth_his'][0] | |
state['img'] = image | |
state['img_his'] = [image] | |
state['depth_his'] = [depth] | |
return image, depth, state['img_his'], state | |
def get_prompt(text): | |
return text | |
def on_save(state, video_name): | |
save_video(state['img_his'], fps=5, out_path=f'output/{video_name}.mp4') | |
def on_seed(seed): | |
return int(seed) | |
def main(args): | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# DreamDrone | |
Official implementation of [DreamDrone](https://hyokong.github.io/publications/dreamdrone-page/). | |
**TL;DR:** Navigate dreamscapes with a ***click*** – your chosen point guides the drone's flight in a thrilling visual journey. | |
## Tutorial | |
1. Enter your prompt (and a negative prompt, if necessary) in the textbox, then click the `Generate first image` button. | |
2. Adjust the camera's moving speed in the `Direction` panel and set hyperparameters in the `Hyper params` panel. | |
3. Click on the generated image to make the camera fly towards the clicked direction. | |
4. The generated images will be displayed in the gallery at the bottom. You can view these images by clicking on them in the gallery or by using the left/right arrow buttons. | |
## Hints | |
- You can set the number of images to generate after clicking on an image, for convenience. | |
- Our system uses a right-hand coordinate system, with the Z-axis pointing into the image. | |
- The rotation speed determines how quickly the camera moves towards the clicked direction (rotation only, no translation). Increase this if you need faster camera pose changes. | |
- The Speed XYZ-axis controls the camera's movement along the X, Y, and Z axes. Adjust these parameters for different movement styles, similar to a camera arm. | |
- $t_1$ represents the timestep that wraps the latent code. | |
- Noise is added from $t_1$ to $t_3$. Between $t_1$ and $t_2$, noise is sourced from a pretrained diffusion U-Net. From $t_2$ to $t_3$, random Gaussian noise is used. | |
- The `Learning rate` and `Feature Correspondence Guidance` control the feature-correspondence guidance weight during the denoising process (from timestep $t_3$ to $0$). | |
- The `KV injection` parameter adjusts the extent of key and value injection from the current frame to the next. | |
> If you encounter any problems, please open an issue. Also, don't forget to star the [Official Github Repo](https://github.com/HyoKong/DreamDrone). | |
***Without further ado, welcome to DreamDrone – enjoy piloting your virtual drone through imaginative landscapes!*** | |
""", | |
) | |
img = np.zeros((512, 512, 3)).astype(np.uint8) | |
depth_img = np.zeros((512, 512, 3)).astype(np.uint8) | |
intrinsic_matrix = np.array([[1000, 0, 512/2], | |
[0, 1000, 512/2], | |
[0, 0, 1]]) # Example intrinsic matrix | |
extrinsic_matrix = np.array([[1.0, 0.0, 0.0, 0.0], | |
[0.0, 1.0, 0.0, 0.0], | |
[0.0, 0.0, 1.0, 0.0]], | |
dtype=np.float32) | |
direction_offset = (255, 255) | |
state = gr.State({ | |
'ori_img': img, | |
'img': None, | |
'centered': False, | |
'img_his': [], | |
'depth_his': [], | |
'intrinsic': intrinsic_matrix, | |
'extrinsic': extrinsic_matrix, | |
'direction_offset': direction_offset | |
}) | |
with gr.Row(): | |
with gr.Column(scale=0.2): | |
with gr.Accordion("Direction"): | |
speed_r = gr.Number(value=0.1, label='Rotation Speed', step=0.01, minimum=0, maximum=1) | |
speed_x = gr.Number(value=0, label='Speed X-axis', step=1, minimum=-10, maximum=20.0) | |
speed_y = gr.Number(value=0, label='Speed Y-axis', step=1, minimum=-10, maximum=20.0) | |
speed_z = gr.Number(value=5, label='Speed Z-axis', step=1, minimum=-10, maximum=20.0) | |
with gr.Accordion('Hyper params'): | |
with gr.Row(): | |
count = gr.Number(value=5, label='Num. of generated images', step=1, minimum=1, maximum=10, precision=0) | |
seed = gr.Number(value=19491000, label='Seed', precision=0) | |
t1 = gr.Slider(1, 49, 2, step=1, label='t1') | |
t2 = gr.Slider(1, 49, 20, step=1, label='t2') | |
t3 = gr.Slider(1, 49, 27, step=1, label='t3') | |
lr = gr.Slider(0, 500, 300, step=1, label='Learning rate') | |
guidance_weight = gr.Slider(0, 10, 0.1, step=0.1, label='Feature correspondance guidance') | |
attn = gr.Slider(0, 1, 0.5, step=0.1, label='KV injection') | |
threshold = gr.Slider(0, 31, 20, step=1, label='Threshold of low-pass filter') | |
early_stop = gr.Slider(0, 50, 48, step=1, label='Early stop timestep for feature-correspondance guidance') | |
video_name = gr.Textbox( | |
label="Saved video name", show_label=True, max_lines=1, placeholder='saved video name', value='output', | |
) | |
with gr.Column(): | |
with gr.Box(): | |
with gr.Row().style(mobile_collapse=False, equal_height=True): | |
text = gr.Textbox( | |
label="Enter your prompt", show_label=False, max_lines=1, placeholder='Enter your prompt', value='Backyards of Old Houses in Antwerp in the Snow, van Gogh', | |
).style( | |
border=(True, False, True, True), | |
rounded=(True, False, False, True), | |
container=False, | |
) | |
with gr.Row().style(mobile_collapse=False, equal_height=True): | |
with gr.Column(scale=0.8): | |
neg_text = gr.Textbox( | |
label="Enter your negative prompt", show_label=False, max_lines=1, value='', placeholder='Enter your negative prompt', | |
).style( | |
border=(True, False, True, True), | |
rounded=(True, False, False, True), | |
container=False, | |
) | |
with gr.Column(scale=0.2): | |
gen_btn = gr.Button("Generate first image").style( | |
margin=False, | |
rounded=(False, True, True, False), | |
) | |
with gr.Box(): | |
with gr.Row().style(mobile_collapse=False, equal_height=True): | |
with gr.Column(): | |
with gr.Tab('Current view'): | |
image = gr.Image(img).style(height=600, width=600) | |
with gr.Column(): | |
with gr.Tab('Depth'): | |
depth_image = gr.Image(depth_img).style(height=600, width=600) | |
with gr.Row(): | |
with gr.Column(min_width=100): | |
reset_btn = gr.Button('Clear All') | |
with gr.Column(min_width=100): | |
undo_btn = gr.Button('Undo Last') | |
with gr.Column(min_width=100): | |
save_btn = gr.Button('Save Video') | |
with gr.Row(): | |
with gr.Tab('Generated image gallery'): | |
gallery = gr.Gallery( | |
label='Generated images', show_label=False, elem_id='gallery', preview=True, rows=1, height=368, | |
).style() | |
image.select(on_click, [state, seed, count, text, neg_text, speed_r, speed_x, speed_y, speed_z, t1, t2, t3, lr, guidance_weight,attn,threshold, early_stop], [image, depth_image, gallery, state]) | |
text.submit(get_prompt, inputs=[text], outputs=[text]) | |
neg_text.submit(get_prompt, inputs=[neg_text], outputs=[neg_text]) | |
gen_btn.click(gen_img, inputs=[text, neg_text, state, seed], outputs=[image, depth_image, gallery, state]) | |
reset_btn.click(on_reset, inputs=[state], outputs=[image, depth_image, gallery, state]) | |
undo_btn.click(on_undo, inputs=[state], outputs=[image, depth_image, gallery, state]) | |
save_btn.click(on_save, inputs=[state, video_name], outputs=[]) | |
global num_inference_steps | |
global pipe | |
global intrinsic | |
global ddim_scheduler | |
global ddpm_scheduler | |
global device | |
global model_id | |
global torch_dtype | |
num_inference_steps = 50 | |
device = args.device | |
model_id = args.model_id | |
ddim_scheduler = DDIMScheduler.from_pretrained(model_id, subfolder="scheduler") | |
ddpm_scheduler = DDPMScheduler.from_pretrained(model_id, subfolder="scheduler") | |
torch_dtype=torch.float16 if 'cuda' in str(device) else torch.float32 | |
pipe = DDIMBackward.from_pretrained( | |
model_id, scheduler=ddim_scheduler, torch_dtype=torch_dtype, | |
cache_dir='.', device=str(device), model_id=model_id, depth_model=args.depth_model, | |
).to(str(device)) | |
if 'cuda' in str(device): | |
pipe.enable_attention_slicing() | |
pipe.enable_xformers_memory_efficient_attention() | |
intrinsic = np.array([[1000, 0, 256], | |
[0, 1000., 256], | |
[0, 0, 1]]) # Example intrinsic matrix | |
return demo | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--device', default='cuda') | |
parser.add_argument('--model_id', default='stabilityai/stable-diffusion-2-1-base') | |
parser.add_argument('--depth_model', default='dpt_beit_large_512', choices=['dpt_beit_large_512', 'dpt_swin2_large_384']) | |
parser.add_argument('--share', action='store_true') | |
parser.add_argument('-p', '--port', type=int, default=None) | |
parser.add_argument('--ip', default=None) | |
args = parser.parse_args() | |
demo = main(args) | |
print('Successfully loaded, starting gradio demo') | |
demo.queue(concurrency_count=1, max_size=20).launch(share=args.share, server_name=args.ip, server_port=args.port) | |