Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,803 Bytes
4ebc565 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
from dataclasses import dataclass
import torch
from torch import Tensor, nn
from flux.modules.layers import (
DoubleStreamBlock,
EmbedND,
LastLayer,
MLPEmbedder,
SingleStreamBlock,
timestep_embedding,
)
DEVICE = torch.device("cuda")
@dataclass
class FluxParams:
in_channels: int
vec_in_dim: int
context_in_dim: int
hidden_size: int
mlp_ratio: float
num_heads: int
depth: int
depth_single_blocks: int
axes_dim: list[int]
theta: int
qkv_bias: bool
guidance_embed: bool
class Flux(nn.Module):
"""
Transformer model for flow matching on sequences.
"""
def __init__(self, params: FluxParams):
super().__init__()
self.params = params
self.in_channels = params.in_channels
self.out_channels = self.in_channels
if params.hidden_size % params.num_heads != 0:
raise ValueError(
f"Hidden size {params.hidden_size} must be divisible by num_heads {params.num_heads}"
)
pe_dim = params.hidden_size // params.num_heads
if sum(params.axes_dim) != pe_dim:
raise ValueError(f"Got {params.axes_dim} but expected positional dim {pe_dim}")
self.hidden_size = params.hidden_size
self.num_heads = params.num_heads
self.pe_embedder = EmbedND(dim=pe_dim, theta=params.theta, axes_dim=params.axes_dim)
self.img_in = nn.Linear(self.in_channels, self.hidden_size, bias=True)
self.time_in = MLPEmbedder(in_dim=256, hidden_dim=self.hidden_size)
self.vector_in = MLPEmbedder(params.vec_in_dim, self.hidden_size)
self.guidance_in = (
MLPEmbedder(in_dim=256, hidden_dim=self.hidden_size) if params.guidance_embed else nn.Identity()
)
self.txt_in = nn.Linear(params.context_in_dim, self.hidden_size)
self.double_blocks = nn.ModuleList(
[
DoubleStreamBlock(
self.hidden_size,
self.num_heads,
mlp_ratio=params.mlp_ratio,
qkv_bias=params.qkv_bias,
).to(torch.bfloat16)
for _ in range(params.depth)
]
)
self.single_blocks = nn.ModuleList(
[
SingleStreamBlock(self.hidden_size, self.num_heads, mlp_ratio=params.mlp_ratio).to(torch.bfloat16)
for _ in range(params.depth_single_blocks)
]
)
self.final_layer = LastLayer(self.hidden_size, 1, self.out_channels)
self.pulid_ca = None
self.pulid_double_interval = 2
self.pulid_single_interval = 4
def forward(
self,
img: Tensor,
img_ids: Tensor,
txt: Tensor,
txt_ids: Tensor,
timesteps: Tensor,
y: Tensor,
guidance: Tensor = None,
id: Tensor = None,
id_weight: float = 1.0,
aggressive_offload: bool = False,
) -> Tensor:
if img.ndim != 3 or txt.ndim != 3:
raise ValueError("Input img and txt tensors must have 3 dimensions.")
# running on sequences img
img = self.img_in(img)
vec = self.time_in(timestep_embedding(timesteps, 256))
if self.params.guidance_embed:
if guidance is None:
raise ValueError("Didn't get guidance strength for guidance distilled model.")
vec = vec + self.guidance_in(timestep_embedding(guidance, 256))
vec = vec + self.vector_in(y)
txt = self.txt_in(txt)
ids = torch.cat((txt_ids, img_ids), dim=1)
pe = self.pe_embedder(ids)
ca_idx = 0
if aggressive_offload:
self.double_blocks = self.double_blocks.to(DEVICE)
for i, block in enumerate(self.double_blocks):
img, txt = block(img=img, txt=txt, vec=vec, pe=pe)
if i % self.pulid_double_interval == 0 and id is not None:
img = img + id_weight * self.pulid_ca[ca_idx](id, img)
ca_idx += 1
if aggressive_offload:
self.double_blocks.cpu()
img = torch.cat((txt, img), 1)
if aggressive_offload:
# put half of the single blcoks to gpu
for i in range(len(self.single_blocks) // 2):
self.single_blocks[i] = self.single_blocks[i].to(DEVICE)
for i, block in enumerate(self.single_blocks):
if aggressive_offload and i == len(self.single_blocks)//2:
# put first half of the single blcoks to cpu and last half to gpu
for j in range(len(self.single_blocks) // 2):
self.single_blocks[j].cpu()
for j in range(len(self.single_blocks) // 2, len(self.single_blocks)):
self.single_blocks[j] = self.single_blocks[j].to(DEVICE)
x = block(img, vec=vec, pe=pe)
real_img, txt = x[:, txt.shape[1]:, ...], x[:, :txt.shape[1], ...]
if i % self.pulid_single_interval == 0 and id is not None:
real_img = real_img + id_weight * self.pulid_ca[ca_idx](id, real_img)
ca_idx += 1
img = torch.cat((txt, real_img), 1)
if aggressive_offload:
self.single_blocks.cpu()
img = img[:, txt.shape[1] :, ...]
img = self.final_layer(img, vec) # (N, T, patch_size ** 2 * out_channels)
return img
def components_to_gpu(self):
# everything but double_blocks, single_blocks
self.img_in.to(DEVICE)
self.time_in.to(DEVICE)
self.guidance_in.to(DEVICE)
self.vector_in.to(DEVICE)
self.txt_in.to(DEVICE)
self.pe_embedder.to(DEVICE)
self.final_layer.to(DEVICE)
if self.pulid_ca:
self.pulid_ca.to(DEVICE)
|