|
|
|
|
|
|
|
import torch |
|
import math |
|
from torch import Tensor, nn |
|
from einops import rearrange, repeat |
|
|
|
from .layers import (DoubleStreamBlock, EmbedND, LastLayer, |
|
MLPEmbedder, SingleStreamBlock, |
|
timestep_embedding) |
|
|
|
from .model import Flux |
|
import comfy.ldm.common_dit |
|
|
|
class MistolineCondDownsamplBlock(nn.Module): |
|
def __init__(self, dtype=None, device=None, operations=None): |
|
super().__init__() |
|
self.encoder = nn.Sequential( |
|
operations.Conv2d(3, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device) |
|
) |
|
|
|
def forward(self, x): |
|
return self.encoder(x) |
|
|
|
class MistolineControlnetBlock(nn.Module): |
|
def __init__(self, hidden_size, dtype=None, device=None, operations=None): |
|
super().__init__() |
|
self.linear = operations.Linear(hidden_size, hidden_size, dtype=dtype, device=device) |
|
self.act = nn.SiLU() |
|
|
|
def forward(self, x): |
|
return self.act(self.linear(x)) |
|
|
|
|
|
class ControlNetFlux(Flux): |
|
def __init__(self, latent_input=False, num_union_modes=0, mistoline=False, control_latent_channels=None, image_model=None, dtype=None, device=None, operations=None, **kwargs): |
|
super().__init__(final_layer=False, dtype=dtype, device=device, operations=operations, **kwargs) |
|
|
|
self.main_model_double = 19 |
|
self.main_model_single = 38 |
|
|
|
self.mistoline = mistoline |
|
|
|
if self.mistoline: |
|
control_block = lambda : MistolineControlnetBlock(self.hidden_size, dtype=dtype, device=device, operations=operations) |
|
else: |
|
control_block = lambda : operations.Linear(self.hidden_size, self.hidden_size, dtype=dtype, device=device) |
|
|
|
self.controlnet_blocks = nn.ModuleList([]) |
|
for _ in range(self.params.depth): |
|
self.controlnet_blocks.append(control_block()) |
|
|
|
self.controlnet_single_blocks = nn.ModuleList([]) |
|
for _ in range(self.params.depth_single_blocks): |
|
self.controlnet_single_blocks.append(control_block()) |
|
|
|
self.num_union_modes = num_union_modes |
|
self.controlnet_mode_embedder = None |
|
if self.num_union_modes > 0: |
|
self.controlnet_mode_embedder = operations.Embedding(self.num_union_modes, self.hidden_size, dtype=dtype, device=device) |
|
|
|
self.gradient_checkpointing = False |
|
self.latent_input = latent_input |
|
if control_latent_channels is None: |
|
control_latent_channels = self.in_channels |
|
else: |
|
control_latent_channels *= 2 * 2 |
|
|
|
self.pos_embed_input = operations.Linear(control_latent_channels, self.hidden_size, bias=True, dtype=dtype, device=device) |
|
if not self.latent_input: |
|
if self.mistoline: |
|
self.input_cond_block = MistolineCondDownsamplBlock(dtype=dtype, device=device, operations=operations) |
|
else: |
|
self.input_hint_block = nn.Sequential( |
|
operations.Conv2d(3, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device), |
|
nn.SiLU(), |
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device) |
|
) |
|
|
|
def forward_orig( |
|
self, |
|
img: Tensor, |
|
img_ids: Tensor, |
|
controlnet_cond: Tensor, |
|
txt: Tensor, |
|
txt_ids: Tensor, |
|
timesteps: Tensor, |
|
y: Tensor, |
|
guidance: Tensor = None, |
|
control_type: Tensor = None, |
|
) -> Tensor: |
|
if img.ndim != 3 or txt.ndim != 3: |
|
raise ValueError("Input img and txt tensors must have 3 dimensions.") |
|
|
|
|
|
img = self.img_in(img) |
|
|
|
controlnet_cond = self.pos_embed_input(controlnet_cond) |
|
img = img + controlnet_cond |
|
vec = self.time_in(timestep_embedding(timesteps, 256)) |
|
if self.params.guidance_embed: |
|
vec = vec + self.guidance_in(timestep_embedding(guidance, 256)) |
|
vec = vec + self.vector_in(y) |
|
txt = self.txt_in(txt) |
|
|
|
if self.controlnet_mode_embedder is not None and len(control_type) > 0: |
|
control_cond = self.controlnet_mode_embedder(torch.tensor(control_type, device=img.device), out_dtype=img.dtype).unsqueeze(0).repeat((txt.shape[0], 1, 1)) |
|
txt = torch.cat([control_cond, txt], dim=1) |
|
txt_ids = torch.cat([txt_ids[:,:1], txt_ids], dim=1) |
|
|
|
ids = torch.cat((txt_ids, img_ids), dim=1) |
|
pe = self.pe_embedder(ids) |
|
|
|
controlnet_double = () |
|
|
|
for i in range(len(self.double_blocks)): |
|
img, txt = self.double_blocks[i](img=img, txt=txt, vec=vec, pe=pe) |
|
controlnet_double = controlnet_double + (self.controlnet_blocks[i](img),) |
|
|
|
img = torch.cat((txt, img), 1) |
|
|
|
controlnet_single = () |
|
|
|
for i in range(len(self.single_blocks)): |
|
img = self.single_blocks[i](img, vec=vec, pe=pe) |
|
controlnet_single = controlnet_single + (self.controlnet_single_blocks[i](img[:, txt.shape[1] :, ...]),) |
|
|
|
repeat = math.ceil(self.main_model_double / len(controlnet_double)) |
|
if self.latent_input: |
|
out_input = () |
|
for x in controlnet_double: |
|
out_input += (x,) * repeat |
|
else: |
|
out_input = (controlnet_double * repeat) |
|
|
|
out = {"input": out_input[:self.main_model_double]} |
|
if len(controlnet_single) > 0: |
|
repeat = math.ceil(self.main_model_single / len(controlnet_single)) |
|
out_output = () |
|
if self.latent_input: |
|
for x in controlnet_single: |
|
out_output += (x,) * repeat |
|
else: |
|
out_output = (controlnet_single * repeat) |
|
out["output"] = out_output[:self.main_model_single] |
|
return out |
|
|
|
def forward(self, x, timesteps, context, y, guidance=None, hint=None, **kwargs): |
|
patch_size = 2 |
|
if self.latent_input: |
|
hint = comfy.ldm.common_dit.pad_to_patch_size(hint, (patch_size, patch_size)) |
|
elif self.mistoline: |
|
hint = hint * 2.0 - 1.0 |
|
hint = self.input_cond_block(hint) |
|
else: |
|
hint = hint * 2.0 - 1.0 |
|
hint = self.input_hint_block(hint) |
|
|
|
hint = rearrange(hint, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=patch_size, pw=patch_size) |
|
|
|
bs, c, h, w = x.shape |
|
x = comfy.ldm.common_dit.pad_to_patch_size(x, (patch_size, patch_size)) |
|
|
|
img = rearrange(x, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=patch_size, pw=patch_size) |
|
|
|
h_len = ((h + (patch_size // 2)) // patch_size) |
|
w_len = ((w + (patch_size // 2)) // patch_size) |
|
img_ids = torch.zeros((h_len, w_len, 3), device=x.device, dtype=x.dtype) |
|
img_ids[..., 1] = img_ids[..., 1] + torch.linspace(0, h_len - 1, steps=h_len, device=x.device, dtype=x.dtype)[:, None] |
|
img_ids[..., 2] = img_ids[..., 2] + torch.linspace(0, w_len - 1, steps=w_len, device=x.device, dtype=x.dtype)[None, :] |
|
img_ids = repeat(img_ids, "h w c -> b (h w) c", b=bs) |
|
|
|
txt_ids = torch.zeros((bs, context.shape[1], 3), device=x.device, dtype=x.dtype) |
|
return self.forward_orig(img, img_ids, hint, context, txt_ids, timesteps, y, guidance, control_type=kwargs.get("control_type", [])) |
|
|