|
from comfy_execution.graph_utils import GraphBuilder |
|
from .tools import VariantSupport |
|
|
|
@VariantSupport() |
|
class TestAccumulateNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"to_add": ("*",), |
|
}, |
|
"optional": { |
|
"accumulation": ("ACCUMULATION",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("ACCUMULATION",) |
|
FUNCTION = "accumulate" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def accumulate(self, to_add, accumulation = None): |
|
if accumulation is None: |
|
value = [to_add] |
|
else: |
|
value = accumulation["accum"] + [to_add] |
|
return ({"accum": value},) |
|
|
|
@VariantSupport() |
|
class TestAccumulationHeadNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("ACCUMULATION", "*",) |
|
FUNCTION = "accumulation_head" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def accumulation_head(self, accumulation): |
|
accum = accumulation["accum"] |
|
if len(accum) == 0: |
|
return (accumulation, None) |
|
else: |
|
return ({"accum": accum[1:]}, accum[0]) |
|
|
|
class TestAccumulationTailNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("ACCUMULATION", "*",) |
|
FUNCTION = "accumulation_tail" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def accumulation_tail(self, accumulation): |
|
accum = accumulation["accum"] |
|
if len(accum) == 0: |
|
return (None, accumulation) |
|
else: |
|
return ({"accum": accum[:-1]}, accum[-1]) |
|
|
|
@VariantSupport() |
|
class TestAccumulationToListNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("*",) |
|
OUTPUT_IS_LIST = (True,) |
|
|
|
FUNCTION = "accumulation_to_list" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def accumulation_to_list(self, accumulation): |
|
return (accumulation["accum"],) |
|
|
|
@VariantSupport() |
|
class TestListToAccumulationNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"list": ("*",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("ACCUMULATION",) |
|
INPUT_IS_LIST = (True,) |
|
|
|
FUNCTION = "list_to_accumulation" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def list_to_accumulation(self, list): |
|
return ({"accum": list},) |
|
|
|
@VariantSupport() |
|
class TestAccumulationGetLengthNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("INT",) |
|
|
|
FUNCTION = "accumlength" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def accumlength(self, accumulation): |
|
return (len(accumulation['accum']),) |
|
|
|
@VariantSupport() |
|
class TestAccumulationGetItemNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
"index": ("INT", {"default":0, "step":1}) |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("*",) |
|
|
|
FUNCTION = "get_item" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def get_item(self, accumulation, index): |
|
return (accumulation['accum'][index],) |
|
|
|
@VariantSupport() |
|
class TestAccumulationSetItemNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"accumulation": ("ACCUMULATION",), |
|
"index": ("INT", {"default":0, "step":1}), |
|
"value": ("*",), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("ACCUMULATION",) |
|
|
|
FUNCTION = "set_item" |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def set_item(self, accumulation, index, value): |
|
new_accum = accumulation['accum'][:] |
|
new_accum[index] = value |
|
return ({"accum": new_accum},) |
|
|
|
class TestIntMathOperation: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), |
|
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), |
|
"operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("INT",) |
|
FUNCTION = "int_math_operation" |
|
|
|
CATEGORY = "Testing/Logic" |
|
|
|
def int_math_operation(self, a, b, operation): |
|
if operation == "add": |
|
return (a + b,) |
|
elif operation == "subtract": |
|
return (a - b,) |
|
elif operation == "multiply": |
|
return (a * b,) |
|
elif operation == "divide": |
|
return (a // b,) |
|
elif operation == "modulo": |
|
return (a % b,) |
|
elif operation == "power": |
|
return (a ** b,) |
|
|
|
|
|
from .flow_control import NUM_FLOW_SOCKETS |
|
@VariantSupport() |
|
class TestForLoopOpen: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}), |
|
}, |
|
"optional": { |
|
f"initial_value{i}": ("*",) for i in range(1, NUM_FLOW_SOCKETS) |
|
}, |
|
"hidden": { |
|
"initial_value0": ("*",) |
|
} |
|
} |
|
|
|
RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1)) |
|
RETURN_NAMES = tuple(["flow_control", "remaining"] + [f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) |
|
FUNCTION = "for_loop_open" |
|
|
|
CATEGORY = "Testing/Flow" |
|
|
|
def for_loop_open(self, remaining, **kwargs): |
|
graph = GraphBuilder() |
|
if "initial_value0" in kwargs: |
|
remaining = kwargs["initial_value0"] |
|
while_open = graph.node("TestWhileLoopOpen", condition=remaining, initial_value0=remaining, **{(f"initial_value{i}"): kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)}) |
|
outputs = [kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)] |
|
return { |
|
"result": tuple(["stub", remaining] + outputs), |
|
"expand": graph.finalize(), |
|
} |
|
|
|
@VariantSupport() |
|
class TestForLoopClose: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"flow_control": ("FLOW_CONTROL", {"rawLink": True}), |
|
}, |
|
"optional": { |
|
f"initial_value{i}": ("*",{"rawLink": True}) for i in range(1, NUM_FLOW_SOCKETS) |
|
}, |
|
} |
|
|
|
RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1)) |
|
RETURN_NAMES = tuple([f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) |
|
FUNCTION = "for_loop_close" |
|
|
|
CATEGORY = "Testing/Flow" |
|
|
|
def for_loop_close(self, flow_control, **kwargs): |
|
graph = GraphBuilder() |
|
while_open = flow_control[0] |
|
sub = graph.node("TestIntMathOperation", operation="subtract", a=[while_open,1], b=1) |
|
cond = graph.node("TestToBoolNode", value=sub.out(0)) |
|
input_values = {f"initial_value{i}": kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)} |
|
while_close = graph.node("TestWhileLoopClose", |
|
flow_control=flow_control, |
|
condition=cond.out(0), |
|
initial_value0=sub.out(0), |
|
**input_values) |
|
return { |
|
"result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]), |
|
"expand": graph.finalize(), |
|
} |
|
|
|
NUM_LIST_SOCKETS = 10 |
|
@VariantSupport() |
|
class TestMakeListNode: |
|
def __init__(self): |
|
pass |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": { |
|
"value1": ("*",), |
|
}, |
|
"optional": { |
|
f"value{i}": ("*",) for i in range(1, NUM_LIST_SOCKETS) |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("*",) |
|
FUNCTION = "make_list" |
|
OUTPUT_IS_LIST = (True,) |
|
|
|
CATEGORY = "Testing/Lists" |
|
|
|
def make_list(self, **kwargs): |
|
result = [] |
|
for i in range(NUM_LIST_SOCKETS): |
|
if f"value{i}" in kwargs: |
|
result.append(kwargs[f"value{i}"]) |
|
return (result,) |
|
|
|
UTILITY_NODE_CLASS_MAPPINGS = { |
|
"TestAccumulateNode": TestAccumulateNode, |
|
"TestAccumulationHeadNode": TestAccumulationHeadNode, |
|
"TestAccumulationTailNode": TestAccumulationTailNode, |
|
"TestAccumulationToListNode": TestAccumulationToListNode, |
|
"TestListToAccumulationNode": TestListToAccumulationNode, |
|
"TestAccumulationGetLengthNode": TestAccumulationGetLengthNode, |
|
"TestAccumulationGetItemNode": TestAccumulationGetItemNode, |
|
"TestAccumulationSetItemNode": TestAccumulationSetItemNode, |
|
"TestForLoopOpen": TestForLoopOpen, |
|
"TestForLoopClose": TestForLoopClose, |
|
"TestIntMathOperation": TestIntMathOperation, |
|
"TestMakeListNode": TestMakeListNode, |
|
} |
|
UTILITY_NODE_DISPLAY_NAME_MAPPINGS = { |
|
"TestAccumulateNode": "Accumulate", |
|
"TestAccumulationHeadNode": "Accumulation Head", |
|
"TestAccumulationTailNode": "Accumulation Tail", |
|
"TestAccumulationToListNode": "Accumulation to List", |
|
"TestListToAccumulationNode": "List to Accumulation", |
|
"TestAccumulationGetLengthNode": "Accumulation Get Length", |
|
"TestAccumulationGetItemNode": "Accumulation Get Item", |
|
"TestAccumulationSetItemNode": "Accumulation Set Item", |
|
"TestForLoopOpen": "For Loop Open", |
|
"TestForLoopClose": "For Loop Close", |
|
"TestIntMathOperation": "Int Math Operation", |
|
"TestMakeListNode": "Make List", |
|
} |
|
|