import time import uuid from os import getenv from typing import cast import pytest from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.entities import CodeNodeData from models.enums import UserFrom from models.workflow import WorkflowNodeExecutionStatus, WorkflowType from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000")) def init_code_node(code_config: dict): graph_config = { "edges": [ { "id": "start-source-code-target", "source": "start", "target": "code", }, ], "nodes": [{"data": {"type": "start"}, "id": "start"}, code_config], } graph = Graph.init(graph_config=graph_config) init_params = GraphInitParams( tenant_id="1", app_id="1", workflow_type=WorkflowType.WORKFLOW, workflow_id="1", graph_config=graph_config, user_id="1", user_from=UserFrom.ACCOUNT, invoke_from=InvokeFrom.DEBUGGER, call_depth=0, ) # construct variable pool variable_pool = VariablePool( system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={}, environment_variables=[], conversation_variables=[], ) variable_pool.add(["code", "123", "args1"], 1) variable_pool.add(["code", "123", "args2"], 2) node = CodeNode( id=str(uuid.uuid4()), graph_init_params=init_params, graph=graph, graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()), config=code_config, ) return node @pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True) def test_execute_code(setup_code_executor_mock): code = """ def main(args1: int, args2: int) -> dict: return { "result": args1 + args2, } """ # trim first 4 spaces at the beginning of each line code = "\n".join([line[4:] for line in code.split("\n")]) code_config = { "id": "code", "data": { "outputs": { "result": { "type": "number", }, }, "title": "123", "variables": [ { "variable": "args1", "value_selector": ["1", "123", "args1"], }, {"variable": "args2", "value_selector": ["1", "123", "args2"]}, ], "answer": "123", "code_language": "python3", "code": code, }, } node = init_code_node(code_config) node.graph_runtime_state.variable_pool.add(["1", "123", "args1"], 1) node.graph_runtime_state.variable_pool.add(["1", "123", "args2"], 2) # execute node result = node._run() assert isinstance(result, NodeRunResult) assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.outputs is not None assert result.outputs["result"] == 3 assert result.error is None @pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True) def test_execute_code_output_validator(setup_code_executor_mock): code = """ def main(args1: int, args2: int) -> dict: return { "result": args1 + args2, } """ # trim first 4 spaces at the beginning of each line code = "\n".join([line[4:] for line in code.split("\n")]) code_config = { "id": "code", "data": { "outputs": { "result": { "type": "string", }, }, "title": "123", "variables": [ { "variable": "args1", "value_selector": ["1", "123", "args1"], }, {"variable": "args2", "value_selector": ["1", "123", "args2"]}, ], "answer": "123", "code_language": "python3", "code": code, }, } node = init_code_node(code_config) node.graph_runtime_state.variable_pool.add(["1", "123", "args1"], 1) node.graph_runtime_state.variable_pool.add(["1", "123", "args2"], 2) # execute node result = node._run() assert isinstance(result, NodeRunResult) assert result.status == WorkflowNodeExecutionStatus.FAILED assert result.error == "Output variable `result` must be a string" def test_execute_code_output_validator_depth(): code = """ def main(args1: int, args2: int) -> dict: return { "result": { "result": args1 + args2, } } """ # trim first 4 spaces at the beginning of each line code = "\n".join([line[4:] for line in code.split("\n")]) code_config = { "id": "code", "data": { "outputs": { "string_validator": { "type": "string", }, "number_validator": { "type": "number", }, "number_array_validator": { "type": "array[number]", }, "string_array_validator": { "type": "array[string]", }, "object_validator": { "type": "object", "children": { "result": { "type": "number", }, "depth": { "type": "object", "children": { "depth": { "type": "object", "children": { "depth": { "type": "number", } }, } }, }, }, }, }, "title": "123", "variables": [ { "variable": "args1", "value_selector": ["1", "123", "args1"], }, {"variable": "args2", "value_selector": ["1", "123", "args2"]}, ], "answer": "123", "code_language": "python3", "code": code, }, } node = init_code_node(code_config) # construct result result = { "number_validator": 1, "string_validator": "1", "number_array_validator": [1, 2, 3, 3.333], "string_array_validator": ["1", "2", "3"], "object_validator": {"result": 1, "depth": {"depth": {"depth": 1}}}, } node.node_data = cast(CodeNodeData, node.node_data) # validate node._transform_result(result, node.node_data.outputs) # construct result result = { "number_validator": "1", "string_validator": 1, "number_array_validator": ["1", "2", "3", "3.333"], "string_array_validator": [1, 2, 3], "object_validator": {"result": "1", "depth": {"depth": {"depth": "1"}}}, } # validate with pytest.raises(ValueError): node._transform_result(result, node.node_data.outputs) # construct result result = { "number_validator": 1, "string_validator": (CODE_MAX_STRING_LENGTH + 1) * "1", "number_array_validator": [1, 2, 3, 3.333], "string_array_validator": ["1", "2", "3"], "object_validator": {"result": 1, "depth": {"depth": {"depth": 1}}}, } # validate with pytest.raises(ValueError): node._transform_result(result, node.node_data.outputs) # construct result result = { "number_validator": 1, "string_validator": "1", "number_array_validator": [1, 2, 3, 3.333] * 2000, "string_array_validator": ["1", "2", "3"], "object_validator": {"result": 1, "depth": {"depth": {"depth": 1}}}, } # validate with pytest.raises(ValueError): node._transform_result(result, node.node_data.outputs) def test_execute_code_output_object_list(): code = """ def main(args1: int, args2: int) -> dict: return { "result": { "result": args1 + args2, } } """ # trim first 4 spaces at the beginning of each line code = "\n".join([line[4:] for line in code.split("\n")]) code_config = { "id": "code", "data": { "outputs": { "object_list": { "type": "array[object]", }, }, "title": "123", "variables": [ { "variable": "args1", "value_selector": ["1", "123", "args1"], }, {"variable": "args2", "value_selector": ["1", "123", "args2"]}, ], "answer": "123", "code_language": "python3", "code": code, }, } node = init_code_node(code_config) # construct result result = { "object_list": [ { "result": 1, }, { "result": 2, }, { "result": [1, 2, 3], }, ] } node.node_data = cast(CodeNodeData, node.node_data) # validate node._transform_result(result, node.node_data.outputs) # construct result result = { "object_list": [ { "result": 1, }, { "result": 2, }, { "result": [1, 2, 3], }, 1, ] } # validate with pytest.raises(ValueError): node._transform_result(result, node.node_data.outputs)