|
from io import BytesIO |
|
|
|
import cv2 |
|
import numpy as np |
|
import torch |
|
from PIL import Image |
|
|
|
from ..log import log |
|
from ..utils import EASINGS, apply_easing, pil2tensor |
|
from .transform import MTB_TransformImage |
|
|
|
|
|
def hex_to_rgb(hex_color: str, bgr: bool = False): |
|
hex_color = hex_color.lstrip("#") |
|
if bgr: |
|
return tuple(int(hex_color[i : i + 2], 16) for i in (4, 2, 0)) |
|
|
|
return tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4)) |
|
|
|
|
|
class MTB_BatchFloatMath: |
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"reverse": ("BOOLEAN", {"default": False}), |
|
"operation": ( |
|
["add", "sub", "mul", "div", "pow", "abs"], |
|
{"default": "add"}, |
|
), |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("FLOATS",) |
|
CATEGORY = "mtb/utils" |
|
FUNCTION = "execute" |
|
|
|
def execute(self, reverse: bool, operation: str, **kwargs: list[float]): |
|
res: list[float] = [] |
|
vals = list(kwargs.values()) |
|
|
|
if reverse: |
|
vals = vals[::-1] |
|
|
|
ref_count = len(vals[0]) |
|
for v in vals: |
|
if len(v) != ref_count: |
|
raise ValueError( |
|
f"All values must have the same length (current: {len(v)}, ref: {ref_count}" |
|
) |
|
|
|
match operation: |
|
case "add": |
|
for i in range(ref_count): |
|
result = sum(v[i] for v in vals) |
|
res.append(result) |
|
case "sub": |
|
for i in range(ref_count): |
|
result = vals[0][i] - sum(v[i] for v in vals[1:]) |
|
res.append(result) |
|
case "mul": |
|
for i in range(ref_count): |
|
result = vals[0][i] * vals[1][i] |
|
res.append(result) |
|
case "div": |
|
for i in range(ref_count): |
|
result = vals[0][i] / vals[1][i] |
|
res.append(result) |
|
case "pow": |
|
for i in range(ref_count): |
|
result: float = vals[0][i] ** vals[1][i] |
|
res.append(result) |
|
case "abs": |
|
for i in range(ref_count): |
|
result = abs(vals[0][i]) |
|
res.append(result) |
|
case _: |
|
log.info(f"For now this mode ({operation}) is not implemented") |
|
|
|
return (res,) |
|
|
|
|
|
class MTB_BatchFloatNormalize: |
|
"""Normalize the values in the list of floats""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": {"floats": ("FLOATS",)}, |
|
} |
|
|
|
RETURN_TYPES = ("FLOATS",) |
|
RETURN_NAMES = ("normalized_floats",) |
|
CATEGORY = "mtb/batch" |
|
FUNCTION = "execute" |
|
|
|
def execute( |
|
self, |
|
floats: list[float], |
|
): |
|
min_value = min(floats) |
|
max_value = max(floats) |
|
|
|
normalized_floats = [ |
|
(x - min_value) / (max_value - min_value) for x in floats |
|
] |
|
log.debug(f"Floats: {floats}") |
|
log.debug(f"Normalized Floats: {normalized_floats}") |
|
|
|
return (normalized_floats,) |
|
|
|
|
|
class MTB_BatchTimeWrap: |
|
"""Remap a batch using a time curve (FLOATS)""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"target_count": ("INT", {"default": 25, "min": 2}), |
|
"frames": ("IMAGE",), |
|
"curve": ("FLOATS",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE", "FLOATS") |
|
RETURN_NAMES = ("image", "interpolated_floats") |
|
CATEGORY = "mtb/batch" |
|
FUNCTION = "execute" |
|
|
|
def execute( |
|
self, target_count: int, frames: torch.Tensor, curve: list[float] |
|
): |
|
"""Apply time warping to a list of video frames based on a curve.""" |
|
log.debug(f"Input frames shape: {frames.shape}") |
|
log.debug(f"Curve: {curve}") |
|
|
|
total_duration = sum(curve) |
|
|
|
log.debug(f"Total duration: {total_duration}") |
|
|
|
B, H, W, C = frames.shape |
|
|
|
log.debug(f"Batch Size: {B}") |
|
|
|
normalized_times = np.linspace(0, 1, target_count) |
|
interpolated_curve = np.interp( |
|
normalized_times, np.linspace(0, 1, len(curve)), curve |
|
).tolist() |
|
log.debug(f"Interpolated curve: {interpolated_curve}") |
|
|
|
interpolated_frame_indices = [ |
|
(B - 1) * value for value in interpolated_curve |
|
] |
|
log.debug(f"Interpolated frame indices: {interpolated_frame_indices}") |
|
|
|
rounded_indices = [ |
|
int(round(idx)) for idx in interpolated_frame_indices |
|
] |
|
rounded_indices = np.clip(rounded_indices, 0, B - 1) |
|
|
|
|
|
warped_frames = [] |
|
for index in rounded_indices: |
|
warped_frames.append(frames[index].unsqueeze(0)) |
|
|
|
warped_tensor = torch.cat(warped_frames, dim=0) |
|
log.debug(f"Warped frames shape: {warped_tensor.shape}") |
|
return (warped_tensor, interpolated_curve) |
|
|
|
|
|
class MTB_BatchMake: |
|
"""Simply duplicates the input frame as a batch""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"image": ("IMAGE",), |
|
"count": ("INT", {"default": 1}), |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
FUNCTION = "generate_batch" |
|
CATEGORY = "mtb/batch" |
|
|
|
def generate_batch(self, image: torch.Tensor, count): |
|
if len(image.shape) == 3: |
|
image = image.unsqueeze(0) |
|
|
|
return (image.repeat(count, 1, 1, 1),) |
|
|
|
|
|
class MTB_BatchShape: |
|
"""Generates a batch of 2D shapes with optional shading (experimental)""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"count": ("INT", {"default": 1}), |
|
"shape": ( |
|
["Box", "Circle", "Diamond", "Tube"], |
|
{"default": "Circle"}, |
|
), |
|
"image_width": ("INT", {"default": 512}), |
|
"image_height": ("INT", {"default": 512}), |
|
"shape_size": ("INT", {"default": 100}), |
|
"color": ("COLOR", {"default": "#ffffff"}), |
|
"bg_color": ("COLOR", {"default": "#000000"}), |
|
"shade_color": ("COLOR", {"default": "#000000"}), |
|
"thickness": ("INT", {"default": 5}), |
|
"shadex": ("FLOAT", {"default": 0.0}), |
|
"shadey": ("FLOAT", {"default": 0.0}), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
FUNCTION = "generate_shapes" |
|
CATEGORY = "mtb/batch" |
|
|
|
def generate_shapes( |
|
self, |
|
count, |
|
shape, |
|
image_width, |
|
image_height, |
|
shape_size, |
|
color, |
|
bg_color, |
|
shade_color, |
|
thickness, |
|
shadex, |
|
shadey, |
|
): |
|
log.debug(f"COLOR: {color}") |
|
log.debug(f"BG_COLOR: {bg_color}") |
|
log.debug(f"SHADE_COLOR: {shade_color}") |
|
|
|
|
|
color = hex_to_rgb(color) |
|
bg_color = hex_to_rgb(bg_color) |
|
shade_color = hex_to_rgb(shade_color) |
|
res = [] |
|
for x in range(count): |
|
|
|
canvas = np.full( |
|
(image_height, image_width, 3), bg_color, dtype=np.uint8 |
|
) |
|
mask = np.zeros((image_height, image_width), dtype=np.uint8) |
|
|
|
|
|
center = (image_width // 2, image_height // 2) |
|
|
|
if shape == "Box": |
|
half_size = shape_size // 2 |
|
top_left = (center[0] - half_size, center[1] - half_size) |
|
bottom_right = (center[0] + half_size, center[1] + half_size) |
|
cv2.rectangle(mask, top_left, bottom_right, 255, -1) |
|
elif shape == "Circle": |
|
cv2.circle(mask, center, shape_size // 2, 255, -1) |
|
elif shape == "Diamond": |
|
pts = np.array( |
|
[ |
|
[center[0], center[1] - shape_size // 2], |
|
[center[0] + shape_size // 2, center[1]], |
|
[center[0], center[1] + shape_size // 2], |
|
[center[0] - shape_size // 2, center[1]], |
|
] |
|
) |
|
cv2.fillPoly(mask, [pts], 255) |
|
|
|
elif shape == "Tube": |
|
cv2.ellipse( |
|
mask, |
|
center, |
|
(shape_size // 2, shape_size // 2), |
|
0, |
|
0, |
|
360, |
|
255, |
|
thickness, |
|
) |
|
|
|
|
|
canvas[mask == 255] = color |
|
|
|
|
|
shading = np.zeros_like(canvas, dtype=np.float32) |
|
shading[:, :, 0] = shadex * np.linspace(0, 1, image_width) |
|
shading[:, :, 1] = shadey * np.linspace( |
|
0, 1, image_height |
|
).reshape(-1, 1) |
|
shading_canvas = cv2.addWeighted( |
|
canvas.astype(np.float32), 1, shading, 1, 0 |
|
).astype(np.uint8) |
|
|
|
|
|
canvas[mask == 255] = shading_canvas[mask == 255] |
|
res.append(canvas) |
|
|
|
return (pil2tensor(res),) |
|
|
|
|
|
class MTB_BatchFloatFill: |
|
"""Fills a batch float with a single value until it reaches the target length""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"floats": ("FLOATS",), |
|
"direction": (["head", "tail"], {"default": "tail"}), |
|
"value": ("FLOAT", {"default": 0.0}), |
|
"count": ("INT", {"default": 1}), |
|
} |
|
} |
|
|
|
FUNCTION = "fill_floats" |
|
RETURN_TYPES = ("FLOATS",) |
|
CATEGORY = "mtb/batch" |
|
|
|
def fill_floats(self, floats, direction, value, count): |
|
size = len(floats) |
|
if size > count: |
|
raise ValueError( |
|
f"Size ({size}) is less then target count ({count})" |
|
) |
|
|
|
rem = count - size |
|
if direction == "tail": |
|
floats = floats + [value] * rem |
|
else: |
|
floats = [value] * rem + floats |
|
return (floats,) |
|
|
|
|
|
class MTB_BatchFloatAssemble: |
|
"""Assembles mutiple batches of floats into a single stream (batch)""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return {"required": {"reverse": ("BOOLEAN", {"default": False})}} |
|
|
|
RETURN_TYPES = ("FLOATS",) |
|
CATEGORY = "mtb/batch" |
|
FUNCTION = "assemble_floats" |
|
|
|
def assemble_floats(self, reverse: bool, **kwargs: list[float]): |
|
res: list[float] = [] |
|
|
|
if reverse: |
|
for x in reversed(kwargs.values()): |
|
if x: |
|
res += x |
|
else: |
|
for x in kwargs.values(): |
|
if x: |
|
res += x |
|
|
|
return (res,) |
|
|
|
|
|
class MTB_BatchFloat: |
|
"""Generates a batch of float values with interpolation""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"mode": ( |
|
["Single", "Steps"], |
|
{"default": "Steps"}, |
|
), |
|
"count": ("INT", {"default": 2}), |
|
"min": ("FLOAT", {"default": 0.0, "step": 0.001}), |
|
"max": ("FLOAT", {"default": 1.0, "step": 0.001}), |
|
"easing": ( |
|
[ |
|
"Linear", |
|
"Sine In", |
|
"Sine Out", |
|
"Sine In/Out", |
|
"Quart In", |
|
"Quart Out", |
|
"Quart In/Out", |
|
"Cubic In", |
|
"Cubic Out", |
|
"Cubic In/Out", |
|
"Circ In", |
|
"Circ Out", |
|
"Circ In/Out", |
|
"Back In", |
|
"Back Out", |
|
"Back In/Out", |
|
"Elastic In", |
|
"Elastic Out", |
|
"Elastic In/Out", |
|
"Bounce In", |
|
"Bounce Out", |
|
"Bounce In/Out", |
|
], |
|
{"default": "Linear"}, |
|
), |
|
} |
|
} |
|
|
|
FUNCTION = "set_floats" |
|
RETURN_TYPES = ("FLOATS",) |
|
CATEGORY = "mtb/batch" |
|
|
|
def set_floats(self, mode, count, min, max, easing): |
|
if mode == "Steps" and count == 1: |
|
raise ValueError( |
|
"Steps mode requires at least a count of 2 values" |
|
) |
|
keyframes = [] |
|
if mode == "Single": |
|
keyframes = [min] * count |
|
return (keyframes,) |
|
|
|
for i in range(count): |
|
normalized_step = i / (count - 1) |
|
eased_step = apply_easing(normalized_step, easing) |
|
eased_value = min + (max - min) * eased_step |
|
keyframes.append(eased_value) |
|
|
|
return (keyframes,) |
|
|
|
|
|
class MTB_BatchMerge: |
|
"""Merges multiple image batches with different frame counts""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"fusion_mode": ( |
|
["add", "multiply", "average"], |
|
{"default": "average"}, |
|
), |
|
"fill": (["head", "tail"], {"default": "tail"}), |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
FUNCTION = "merge_batches" |
|
CATEGORY = "mtb/batch" |
|
|
|
def merge_batches(self, fusion_mode: str, fill: str, **kwargs): |
|
images = kwargs.values() |
|
max_frames = max(img.shape[0] for img in images) |
|
|
|
adjusted_images = [] |
|
for img in images: |
|
frame_count = img.shape[0] |
|
if frame_count < max_frames: |
|
fill_frame = img[0] if fill == "head" else img[-1] |
|
fill_frames = fill_frame.repeat( |
|
max_frames - frame_count, 1, 1, 1 |
|
) |
|
adjusted_batch = ( |
|
torch.cat((fill_frames, img), dim=0) |
|
if fill == "head" |
|
else torch.cat((img, fill_frames), dim=0) |
|
) |
|
else: |
|
adjusted_batch = img |
|
adjusted_images.append(adjusted_batch) |
|
|
|
|
|
merged_image = None |
|
for img in adjusted_images: |
|
if merged_image is None: |
|
merged_image = img |
|
else: |
|
if fusion_mode == "add": |
|
merged_image += img |
|
elif fusion_mode == "multiply": |
|
merged_image *= img |
|
elif fusion_mode == "average": |
|
merged_image = (merged_image + img) / 2 |
|
|
|
return (merged_image,) |
|
|
|
|
|
class MTB_Batch2dTransform: |
|
"""Transform a batch of images using a batch of keyframes""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"image": ("IMAGE",), |
|
"border_handling": ( |
|
["edge", "constant", "reflect", "symmetric"], |
|
{"default": "edge"}, |
|
), |
|
"constant_color": ("COLOR", {"default": "#000000"}), |
|
}, |
|
"optional": { |
|
"x": ("FLOATS",), |
|
"y": ("FLOATS",), |
|
"zoom": ("FLOATS",), |
|
"angle": ("FLOATS",), |
|
"shear": ("FLOATS",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
FUNCTION = "transform_batch" |
|
CATEGORY = "mtb/batch" |
|
|
|
def get_num_elements( |
|
self, param: None | torch.Tensor | list[torch.Tensor] | list[float] |
|
) -> int: |
|
if isinstance(param, torch.Tensor): |
|
return torch.numel(param) |
|
|
|
elif isinstance(param, list): |
|
return len(param) |
|
|
|
return 0 |
|
|
|
def transform_batch( |
|
self, |
|
image: torch.Tensor, |
|
border_handling: str, |
|
constant_color: str, |
|
x: list[float] | None = None, |
|
y: list[float] | None = None, |
|
zoom: list[float] | None = None, |
|
angle: list[float] | None = None, |
|
shear: list[float] | None = None, |
|
): |
|
if all( |
|
self.get_num_elements(param) <= 0 |
|
for param in [x, y, zoom, angle, shear] |
|
): |
|
raise ValueError( |
|
"At least one transform parameter must be provided" |
|
) |
|
|
|
keyframes: dict[str, list[float]] = { |
|
"x": [], |
|
"y": [], |
|
"zoom": [], |
|
"angle": [], |
|
"shear": [], |
|
} |
|
|
|
default_vals = {"x": 0, "y": 0, "zoom": 1.0, "angle": 0, "shear": 0} |
|
|
|
if x and self.get_num_elements(x) > 0: |
|
keyframes["x"] = x |
|
if y and self.get_num_elements(y) > 0: |
|
keyframes["y"] = y |
|
if zoom and self.get_num_elements(zoom) > 0: |
|
|
|
keyframes["zoom"] = [max(x, 0.00001) for x in zoom] |
|
if angle and self.get_num_elements(angle) > 0: |
|
keyframes["angle"] = angle |
|
if shear and self.get_num_elements(shear) > 0: |
|
keyframes["shear"] = shear |
|
|
|
for name, values in keyframes.items(): |
|
count = len(values) |
|
if count > 0 and count != image.shape[0]: |
|
raise ValueError( |
|
f"Length of {name} values ({count}) must match number of images ({image.shape[0]})" |
|
) |
|
if count == 0: |
|
keyframes[name] = [default_vals[name]] * image.shape[0] |
|
|
|
transformer = MTB_TransformImage() |
|
res = [ |
|
transformer.transform( |
|
image[i].unsqueeze(0), |
|
keyframes["x"][i], |
|
keyframes["y"][i], |
|
keyframes["zoom"][i], |
|
keyframes["angle"][i], |
|
keyframes["shear"][i], |
|
border_handling, |
|
constant_color, |
|
)[0] |
|
for i in range(image.shape[0]) |
|
] |
|
return (torch.cat(res, dim=0),) |
|
|
|
|
|
class MTB_BatchFloatFit: |
|
"""Fit a list of floats using a source and target range""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"values": ("FLOATS", {"forceInput": True}), |
|
"clamp": ("BOOLEAN", {"default": False}), |
|
"auto_compute_source": ("BOOLEAN", {"default": False}), |
|
"source_min": ("FLOAT", {"default": 0.0, "step": 0.01}), |
|
"source_max": ("FLOAT", {"default": 1.0, "step": 0.01}), |
|
"target_min": ("FLOAT", {"default": 0.0, "step": 0.01}), |
|
"target_max": ("FLOAT", {"default": 1.0, "step": 0.01}), |
|
"easing": ( |
|
EASINGS, |
|
{"default": "Linear"}, |
|
), |
|
} |
|
} |
|
|
|
FUNCTION = "fit_range" |
|
RETURN_TYPES = ("FLOATS",) |
|
CATEGORY = "mtb/batch" |
|
DESCRIPTION = "Fit a list of floats using a source and target range" |
|
|
|
def fit_range( |
|
self, |
|
values: list[float], |
|
clamp: bool, |
|
auto_compute_source: bool, |
|
source_min: float, |
|
source_max: float, |
|
target_min: float, |
|
target_max: float, |
|
easing: str, |
|
): |
|
if auto_compute_source: |
|
source_min = min(values) |
|
source_max = max(values) |
|
|
|
from .graph_utils import MTB_FitNumber |
|
|
|
res = [] |
|
fit_number = MTB_FitNumber() |
|
for value in values: |
|
(transformed_value,) = fit_number.set_range( |
|
value, |
|
clamp, |
|
source_min, |
|
source_max, |
|
target_min, |
|
target_max, |
|
easing, |
|
) |
|
res.append(transformed_value) |
|
|
|
return (res,) |
|
|
|
|
|
class MTB_PlotBatchFloat: |
|
"""Plot floats""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"width": ("INT", {"default": 768}), |
|
"height": ("INT", {"default": 768}), |
|
"point_size": ("INT", {"default": 4}), |
|
"seed": ("INT", {"default": 1}), |
|
"start_at_zero": ("BOOLEAN", {"default": False}), |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
RETURN_NAMES = ("plot",) |
|
FUNCTION = "plot" |
|
CATEGORY = "mtb/batch" |
|
|
|
def plot( |
|
self, |
|
width: int, |
|
height: int, |
|
point_size: int, |
|
seed: int, |
|
start_at_zero: bool, |
|
interactive_backend: bool = False, |
|
**kwargs, |
|
): |
|
import matplotlib |
|
|
|
|
|
if not interactive_backend: |
|
matplotlib.use("Agg") |
|
import matplotlib.pyplot as plt |
|
|
|
fig, ax = plt.subplots(figsize=(width / 100, height / 100), dpi=100) |
|
fig.set_edgecolor("black") |
|
fig.patch.set_facecolor("#2e2e2e") |
|
|
|
ax.set_facecolor("#2e2e2e") |
|
ax.grid(color="gray", linestyle="-", linewidth=0.5, alpha=0.5) |
|
|
|
|
|
all_values = [value for values in kwargs.values() for value in values] |
|
global_min = min(all_values) |
|
global_max = max(all_values) |
|
|
|
y_padding = 0.05 * (global_max - global_min) |
|
ax.set_ylim(global_min - y_padding, global_max + y_padding) |
|
|
|
max_length = max(len(values) for values in kwargs.values()) |
|
if start_at_zero: |
|
x_values = np.linspace(0, max_length - 1, max_length) |
|
else: |
|
x_values = np.linspace(1, max_length, max_length) |
|
|
|
ax.set_xlim(1, max_length) |
|
np.random.seed(seed) |
|
colors = np.random.rand(len(kwargs), 3) |
|
for color, (label, values) in zip(colors, kwargs.items()): |
|
ax.plot(x_values[: len(values)], values, label=label, color=color) |
|
ax.legend( |
|
title="Legend", |
|
title_fontsize="large", |
|
fontsize="medium", |
|
edgecolor="black", |
|
loc="best", |
|
) |
|
|
|
|
|
ax.set_xlabel("Time", fontsize="large", color="white") |
|
ax.set_ylabel("Value", fontsize="large", color="white") |
|
ax.set_title( |
|
"Plot of Values over Time", fontsize="x-large", color="white" |
|
) |
|
|
|
|
|
ax.tick_params(colors="white") |
|
|
|
|
|
for _, spine in ax.spines.items(): |
|
spine.set_edgecolor("white") |
|
|
|
|
|
buf = BytesIO() |
|
plt.savefig(buf, format="png", bbox_inches="tight") |
|
buf.seek(0) |
|
image = Image.open(buf) |
|
plt.close(fig) |
|
|
|
return (pil2tensor(image),) |
|
|
|
def draw_point(self, image, point, color, point_size): |
|
x, y = point |
|
y = image.shape[0] - 1 - y |
|
half_size = point_size // 2 |
|
x_start, x_end = ( |
|
max(0, x - half_size), |
|
min(image.shape[1], x + half_size + 1), |
|
) |
|
y_start, y_end = ( |
|
max(0, y - half_size), |
|
min(image.shape[0], y + half_size + 1), |
|
) |
|
image[y_start:y_end, x_start:x_end] = color |
|
|
|
def draw_line(self, image, start, end, color): |
|
x1, y1 = start |
|
x2, y2 = end |
|
|
|
|
|
y1 = image.shape[0] - 1 - y1 |
|
y2 = image.shape[0] - 1 - y2 |
|
|
|
dx = x2 - x1 |
|
dy = y2 - y1 |
|
is_steep = abs(dy) > abs(dx) |
|
if is_steep: |
|
x1, y1 = y1, x1 |
|
x2, y2 = y2, x2 |
|
swapped = False |
|
if x1 > x2: |
|
x1, x2 = x2, x1 |
|
y1, y2 = y2, y1 |
|
swapped = True |
|
dx = x2 - x1 |
|
dy = y2 - y1 |
|
error = int(dx / 2.0) |
|
y = y1 |
|
ystep = None |
|
if y1 < y2: |
|
ystep = 1 |
|
else: |
|
ystep = -1 |
|
for x in range(x1, x2 + 1): |
|
coord = (y, x) if is_steep else (x, y) |
|
image[coord] = color |
|
error -= abs(dy) |
|
if error < 0: |
|
y += ystep |
|
error += dx |
|
if swapped: |
|
image[(x1, y1)] = color |
|
image[(x2, y2)] = color |
|
|
|
|
|
DEFAULT_INTERPOLANT = lambda t: t * t * t * (t * (t * 6 - 15) + 10) |
|
|
|
|
|
class MTB_BatchShake: |
|
"""Applies a shaking effect to batches of images.""" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"images": ("IMAGE",), |
|
"position_amount_x": ("FLOAT", {"default": 1.0}), |
|
"position_amount_y": ("FLOAT", {"default": 1.0}), |
|
"rotation_amount": ("FLOAT", {"default": 10.0}), |
|
"frequency": ("FLOAT", {"default": 1.0, "min": 0.005}), |
|
"frequency_divider": ("FLOAT", {"default": 1.0, "min": 0.005}), |
|
"octaves": ("INT", {"default": 1, "min": 1}), |
|
"seed": ("INT", {"default": 0}), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE", "FLOATS", "FLOATS", "FLOATS") |
|
RETURN_NAMES = ("image", "pos_x", "pos_y", "rot") |
|
FUNCTION = "apply_shake" |
|
CATEGORY = "mtb/batch" |
|
|
|
|
|
|
|
|
|
def generate_perlin_noise_2d( |
|
self, shape, res, tileable=(False, False), interpolant=None |
|
): |
|
"""Generate a 2D numpy array of perlin noise. |
|
|
|
Args: |
|
shape: The shape of the generated array (tuple of two ints). |
|
This must be a multple of res. |
|
res: The number of periods of noise to generate along each |
|
axis (tuple of two ints). Note shape must be a multiple of |
|
res. |
|
tileable: If the noise should be tileable along each axis |
|
(tuple of two bools). Defaults to (False, False). |
|
interpolant: The interpolation function, defaults to |
|
t*t*t*(t*(t*6 - 15) + 10). |
|
|
|
Returns |
|
------- |
|
A numpy array of shape shape with the generated noise. |
|
|
|
Raises |
|
------ |
|
ValueError: If shape is not a multiple of res. |
|
""" |
|
interpolant = interpolant or DEFAULT_INTERPOLANT |
|
delta = (res[0] / shape[0], res[1] / shape[1]) |
|
d = (shape[0] // res[0], shape[1] // res[1]) |
|
grid = ( |
|
np.mgrid[0 : res[0] : delta[0], 0 : res[1] : delta[1]].transpose( |
|
1, 2, 0 |
|
) |
|
% 1 |
|
) |
|
|
|
angles = 2 * np.pi * np.random.rand(res[0] + 1, res[1] + 1) |
|
gradients = np.dstack((np.cos(angles), np.sin(angles))) |
|
if tileable[0]: |
|
gradients[-1, :] = gradients[0, :] |
|
if tileable[1]: |
|
gradients[:, -1] = gradients[:, 0] |
|
gradients = gradients.repeat(d[0], 0).repeat(d[1], 1) |
|
g00 = gradients[: -d[0], : -d[1]] |
|
g10 = gradients[d[0] :, : -d[1]] |
|
g01 = gradients[: -d[0], d[1] :] |
|
g11 = gradients[d[0] :, d[1] :] |
|
|
|
n00 = np.sum(np.dstack((grid[:, :, 0], grid[:, :, 1])) * g00, 2) |
|
n10 = np.sum(np.dstack((grid[:, :, 0] - 1, grid[:, :, 1])) * g10, 2) |
|
n01 = np.sum(np.dstack((grid[:, :, 0], grid[:, :, 1] - 1)) * g01, 2) |
|
n11 = np.sum( |
|
np.dstack((grid[:, :, 0] - 1, grid[:, :, 1] - 1)) * g11, 2 |
|
) |
|
|
|
t = interpolant(grid) |
|
n0 = n00 * (1 - t[:, :, 0]) + t[:, :, 0] * n10 |
|
n1 = n01 * (1 - t[:, :, 0]) + t[:, :, 0] * n11 |
|
return np.sqrt(2) * ((1 - t[:, :, 1]) * n0 + t[:, :, 1] * n1) |
|
|
|
def generate_fractal_noise_2d( |
|
self, |
|
shape, |
|
res, |
|
octaves=1, |
|
persistence=0.5, |
|
lacunarity=2, |
|
tileable=(True, True), |
|
interpolant=None, |
|
): |
|
"""Generate a 2D numpy array of fractal noise. |
|
|
|
Args: |
|
shape: The shape of the generated array (tuple of two ints). |
|
This must be a multiple of lacunarity**(octaves-1)*res. |
|
res: The number of periods of noise to generate along each |
|
axis (tuple of two ints). Note shape must be a multiple of |
|
(lacunarity**(octaves-1)*res). |
|
octaves: The number of octaves in the noise. Defaults to 1. |
|
persistence: The scaling factor between two octaves. |
|
lacunarity: The frequency factor between two octaves. |
|
tileable: If the noise should be tileable along each axis |
|
(tuple of two bools). Defaults to (True,True). |
|
interpolant: The, interpolation function, defaults to |
|
t*t*t*(t*(t*6 - 15) + 10). |
|
|
|
Returns |
|
------- |
|
A numpy array of fractal noise and of shape shape generated by |
|
combining several octaves of perlin noise. |
|
|
|
Raises |
|
------ |
|
ValueError: If shape is not a multiple of |
|
(lacunarity**(octaves-1)*res). |
|
""" |
|
interpolant = interpolant or DEFAULT_INTERPOLANT |
|
|
|
noise = np.zeros(shape) |
|
frequency = 1 |
|
amplitude = 1 |
|
for _ in range(octaves): |
|
noise += amplitude * self.generate_perlin_noise_2d( |
|
shape, |
|
(frequency * res[0], frequency * res[1]), |
|
tileable, |
|
interpolant, |
|
) |
|
frequency *= lacunarity |
|
amplitude *= persistence |
|
return noise |
|
|
|
def fbm(self, x, y, octaves): |
|
|
|
|
|
x_idx = int(x) % 256 |
|
y_idx = int(y) % 256 |
|
return self.noise_pattern[x_idx, y_idx] |
|
|
|
def apply_shake( |
|
self, |
|
images, |
|
position_amount_x, |
|
position_amount_y, |
|
rotation_amount, |
|
frequency, |
|
frequency_divider, |
|
octaves, |
|
seed, |
|
): |
|
|
|
np.random.seed(seed) |
|
self.position_offset = np.random.uniform(-1e3, 1e3, 3) |
|
self.rotation_offset = np.random.uniform(-1e3, 1e3, 3) |
|
self.noise_pattern = self.generate_perlin_noise_2d( |
|
(512, 512), (32, 32), (True, True) |
|
) |
|
|
|
|
|
frame_count = images.shape[0] |
|
|
|
frequency = frequency / frequency_divider |
|
|
|
|
|
x_translations = [] |
|
y_translations = [] |
|
rotations = [] |
|
|
|
for frame_num in range(frame_count): |
|
time = frame_num * frequency |
|
x_idx = (self.position_offset[0] + frame_num) % 256 |
|
y_idx = (self.position_offset[1] + frame_num) % 256 |
|
|
|
np_position = np.array( |
|
[ |
|
self.fbm(x_idx, time, octaves), |
|
self.fbm(y_idx, time, octaves), |
|
] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rot_idx = (self.rotation_offset[2] + frame_num) % 256 |
|
np_rotation = self.fbm(rot_idx, time, octaves) |
|
|
|
x_translations.append(np_position[0] * position_amount_x) |
|
y_translations.append(np_position[1] * position_amount_y) |
|
rotations.append(np_rotation * rotation_amount) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
transform = MTB_Batch2dTransform() |
|
|
|
log.debug( |
|
f"Applying shaking with parameters: \nposition {position_amount_x}, {position_amount_y}\nrotation {rotation_amount}\nfrequency {frequency}\noctaves {octaves}" |
|
) |
|
|
|
|
|
shaken_images = transform.transform_batch( |
|
images, |
|
border_handling="edge", |
|
constant_color="#000000", |
|
x=x_translations, |
|
y=y_translations, |
|
angle=rotations, |
|
)[0] |
|
|
|
return (shaken_images, x_translations, y_translations, rotations) |
|
|
|
|
|
__nodes__ = [ |
|
MTB_BatchFloat, |
|
MTB_Batch2dTransform, |
|
MTB_BatchShape, |
|
MTB_BatchMake, |
|
MTB_BatchFloatAssemble, |
|
MTB_BatchFloatFill, |
|
MTB_BatchFloatNormalize, |
|
MTB_BatchMerge, |
|
MTB_BatchShake, |
|
MTB_PlotBatchFloat, |
|
MTB_BatchTimeWrap, |
|
MTB_BatchFloatFit, |
|
MTB_BatchFloatMath, |
|
] |
|
|