dify / api /models /workflow.py
Severian's picture
initial commit
a8b3f00
import json
from collections.abc import Mapping, Sequence
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional, Union
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column
import contexts
from constants import HIDDEN_VALUE
from core.helper import encrypter
from core.variables import SecretVariable, Variable
from extensions.ext_database import db
from factories import variable_factory
from libs import helper
from models.enums import CreatedByRole
from .account import Account
from .types import StringUUID
class WorkflowType(Enum):
"""
Workflow Type Enum
"""
WORKFLOW = "workflow"
CHAT = "chat"
@classmethod
def value_of(cls, value: str) -> "WorkflowType":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow type value {value}")
@classmethod
def from_app_mode(cls, app_mode: Union[str, "AppMode"]) -> "WorkflowType":
"""
Get workflow type from app mode.
:param app_mode: app mode
:return: workflow type
"""
from models.model import AppMode
app_mode = app_mode if isinstance(app_mode, AppMode) else AppMode.value_of(app_mode)
return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
class Workflow(db.Model):
"""
Workflow, for `Workflow App` and `Chat App workflow mode`.
Attributes:
- id (uuid) Workflow ID, pk
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- type (string) Workflow type
`workflow` for `Workflow App`
`chat` for `Chat App workflow mode`
- version (string) Version
`draft` for draft version (only one for each app), other for version number (redundant)
- graph (text) Workflow canvas configuration (JSON)
The entire canvas configuration JSON, including Node, Edge, and other configurations
- nodes (array[object]) Node list, see Node Schema
- edges (array[object]) Edge list, see Edge Schema
- created_by (uuid) Creator ID
- created_at (timestamp) Creation time
- updated_by (uuid) `optional` Last updater ID
- updated_at (timestamp) `optional` Last update time
"""
__tablename__ = "workflows"
__table_args__ = (
db.PrimaryKeyConstraint("id", name="workflow_pkey"),
db.Index("workflow_version_idx", "tenant_id", "app_id", "version"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
type: Mapped[str] = mapped_column(db.String(255), nullable=False)
version: Mapped[str] = mapped_column(db.String(255), nullable=False)
graph: Mapped[str] = mapped_column(sa.Text)
_features: Mapped[str] = mapped_column("features", sa.TEXT)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)")
)
updated_by: Mapped[Optional[str]] = mapped_column(StringUUID)
updated_at: Mapped[datetime] = mapped_column(
sa.DateTime, nullable=False, default=datetime.now(tz=timezone.utc), server_onupdate=func.current_timestamp()
)
_environment_variables: Mapped[str] = mapped_column(
"environment_variables", db.Text, nullable=False, server_default="{}"
)
_conversation_variables: Mapped[str] = mapped_column(
"conversation_variables", db.Text, nullable=False, server_default="{}"
)
def __init__(
self,
*,
tenant_id: str,
app_id: str,
type: str,
version: str,
graph: str,
features: str,
created_by: str,
environment_variables: Sequence[Variable],
conversation_variables: Sequence[Variable],
):
self.tenant_id = tenant_id
self.app_id = app_id
self.type = type
self.version = version
self.graph = graph
self.features = features
self.created_by = created_by
self.environment_variables = environment_variables or []
self.conversation_variables = conversation_variables or []
@property
def created_by_account(self):
return db.session.get(Account, self.created_by)
@property
def updated_by_account(self):
return db.session.get(Account, self.updated_by) if self.updated_by else None
@property
def graph_dict(self) -> Mapping[str, Any]:
return json.loads(self.graph) if self.graph else {}
@property
def features(self) -> str:
"""
Convert old features structure to new features structure.
"""
if not self._features:
return self._features
features = json.loads(self._features)
if features.get("file_upload", {}).get("image", {}).get("enabled", False):
image_enabled = True
image_number_limits = int(features["file_upload"]["image"].get("number_limits", 1))
image_transfer_methods = features["file_upload"]["image"].get(
"transfer_methods", ["remote_url", "local_file"]
)
features["file_upload"]["enabled"] = image_enabled
features["file_upload"]["number_limits"] = image_number_limits
features["file_upload"]["allowed_upload_methods"] = image_transfer_methods
features["file_upload"]["allowed_file_types"] = ["image"]
features["file_upload"]["allowed_extensions"] = []
del features["file_upload"]["image"]
self._features = json.dumps(features)
return self._features
@features.setter
def features(self, value: str) -> None:
self._features = value
@property
def features_dict(self) -> Mapping[str, Any]:
return json.loads(self.features) if self.features else {}
def user_input_form(self, to_old_structure: bool = False) -> list:
# get start node from graph
if not self.graph:
return []
graph_dict = self.graph_dict
if "nodes" not in graph_dict:
return []
start_node = next((node for node in graph_dict["nodes"] if node["data"]["type"] == "start"), None)
if not start_node:
return []
# get user_input_form from start node
variables = start_node.get("data", {}).get("variables", [])
if to_old_structure:
old_structure_variables = []
for variable in variables:
old_structure_variables.append({variable["type"]: variable})
return old_structure_variables
return variables
@property
def unique_hash(self) -> str:
"""
Get hash of workflow.
:return: hash
"""
entity = {"graph": self.graph_dict, "features": self.features_dict}
return helper.generate_text_hash(json.dumps(entity, sort_keys=True))
@property
def tool_published(self) -> bool:
from models.tools import WorkflowToolProvider
return (
db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.app_id == self.app_id).first()
is not None
)
@property
def environment_variables(self) -> Sequence[Variable]:
# TODO: find some way to init `self._environment_variables` when instance created.
if self._environment_variables is None:
self._environment_variables = "{}"
tenant_id = contexts.tenant_id.get()
environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables)
results = [variable_factory.build_variable_from_mapping(v) for v in environment_variables_dict.values()]
# decrypt secret variables value
decrypt_func = (
lambda var: var.model_copy(update={"value": encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)})
if isinstance(var, SecretVariable)
else var
)
results = list(map(decrypt_func, results))
return results
@environment_variables.setter
def environment_variables(self, value: Sequence[Variable]):
if not value:
self._environment_variables = "{}"
return
tenant_id = contexts.tenant_id.get()
value = list(value)
if any(var for var in value if not var.id):
raise ValueError("environment variable require a unique id")
# Compare inputs and origin variables,
# if the value is HIDDEN_VALUE, use the origin variable value (only update `name`).
origin_variables_dictionary = {var.id: var for var in self.environment_variables}
for i, variable in enumerate(value):
if variable.id in origin_variables_dictionary and variable.value == HIDDEN_VALUE:
value[i] = origin_variables_dictionary[variable.id].model_copy(update={"name": variable.name})
# encrypt secret variables value
encrypt_func = (
lambda var: var.model_copy(update={"value": encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)})
if isinstance(var, SecretVariable)
else var
)
encrypted_vars = list(map(encrypt_func, value))
environment_variables_json = json.dumps(
{var.name: var.model_dump() for var in encrypted_vars},
ensure_ascii=False,
)
self._environment_variables = environment_variables_json
def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
environment_variables = list(self.environment_variables)
environment_variables = [
v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={"value": ""})
for v in environment_variables
]
result = {
"graph": self.graph_dict,
"features": self.features_dict,
"environment_variables": [var.model_dump(mode="json") for var in environment_variables],
"conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
}
return result
@property
def conversation_variables(self) -> Sequence[Variable]:
# TODO: find some way to init `self._conversation_variables` when instance created.
if self._conversation_variables is None:
self._conversation_variables = "{}"
variables_dict: dict[str, Any] = json.loads(self._conversation_variables)
results = [variable_factory.build_variable_from_mapping(v) for v in variables_dict.values()]
return results
@conversation_variables.setter
def conversation_variables(self, value: Sequence[Variable]) -> None:
self._conversation_variables = json.dumps(
{var.name: var.model_dump() for var in value},
ensure_ascii=False,
)
class WorkflowRunStatus(Enum):
"""
Workflow Run Status Enum
"""
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
STOPPED = "stopped"
@classmethod
def value_of(cls, value: str) -> "WorkflowRunStatus":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow run status value {value}")
class WorkflowRun(db.Model):
"""
Workflow Run
Attributes:
- id (uuid) Run ID
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- sequence_number (int) Auto-increment sequence number, incremented within the App, starting from 1
- workflow_id (uuid) Workflow ID
- type (string) Workflow type
- triggered_from (string) Trigger source
`debugging` for canvas debugging
`app-run` for (published) app execution
- version (string) Version
- graph (text) Workflow canvas configuration (JSON)
- inputs (text) Input parameters
- status (string) Execution status, `running` / `succeeded` / `failed` / `stopped`
- outputs (text) `optional` Output content
- error (string) `optional` Error reason
- elapsed_time (float) `optional` Time consumption (s)
- total_tokens (int) `optional` Total tokens used
- total_steps (int) Total steps (redundant), default 0
- created_by_role (string) Creator role
- `account` Console account
- `end_user` End user
- created_by (uuid) Runner ID
- created_at (timestamp) Run time
- finished_at (timestamp) End time
"""
__tablename__ = "workflow_runs"
__table_args__ = (
db.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
db.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
db.Index("workflow_run_tenant_app_sequence_idx", "tenant_id", "app_id", "sequence_number"),
)
id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id = db.Column(StringUUID, nullable=False)
app_id = db.Column(StringUUID, nullable=False)
sequence_number = db.Column(db.Integer, nullable=False)
workflow_id = db.Column(StringUUID, nullable=False)
type = db.Column(db.String(255), nullable=False)
triggered_from = db.Column(db.String(255), nullable=False)
version = db.Column(db.String(255), nullable=False)
graph = db.Column(db.Text)
inputs = db.Column(db.Text)
status = db.Column(db.String(255), nullable=False)
outputs: Mapped[str] = db.Column(db.Text)
error = db.Column(db.Text)
elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text("0"))
total_tokens = db.Column(db.Integer, nullable=False, server_default=db.text("0"))
total_steps = db.Column(db.Integer, server_default=db.text("0"))
created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))
finished_at = db.Column(db.DateTime)
@property
def created_by_account(self):
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(Account, self.created_by) if created_by_role == CreatedByRole.ACCOUNT else None
@property
def created_by_end_user(self):
from models.model import EndUser
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatedByRole.END_USER else None
@property
def graph_dict(self):
return json.loads(self.graph) if self.graph else {}
@property
def inputs_dict(self) -> Mapping[str, Any]:
return json.loads(self.inputs) if self.inputs else {}
@property
def outputs_dict(self) -> Mapping[str, Any]:
return json.loads(self.outputs) if self.outputs else {}
@property
def message(self) -> Optional["Message"]:
from models.model import Message
return (
db.session.query(Message).filter(Message.app_id == self.app_id, Message.workflow_run_id == self.id).first()
)
@property
def workflow(self):
return db.session.query(Workflow).filter(Workflow.id == self.workflow_id).first()
def to_dict(self):
return {
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"sequence_number": self.sequence_number,
"workflow_id": self.workflow_id,
"type": self.type,
"triggered_from": self.triggered_from,
"version": self.version,
"graph": self.graph_dict,
"inputs": self.inputs_dict,
"status": self.status,
"outputs": self.outputs_dict,
"error": self.error,
"elapsed_time": self.elapsed_time,
"total_tokens": self.total_tokens,
"total_steps": self.total_steps,
"created_by_role": self.created_by_role,
"created_by": self.created_by,
"created_at": self.created_at,
"finished_at": self.finished_at,
}
@classmethod
def from_dict(cls, data: dict) -> "WorkflowRun":
return cls(
id=data.get("id"),
tenant_id=data.get("tenant_id"),
app_id=data.get("app_id"),
sequence_number=data.get("sequence_number"),
workflow_id=data.get("workflow_id"),
type=data.get("type"),
triggered_from=data.get("triggered_from"),
version=data.get("version"),
graph=json.dumps(data.get("graph")),
inputs=json.dumps(data.get("inputs")),
status=data.get("status"),
outputs=json.dumps(data.get("outputs")),
error=data.get("error"),
elapsed_time=data.get("elapsed_time"),
total_tokens=data.get("total_tokens"),
total_steps=data.get("total_steps"),
created_by_role=data.get("created_by_role"),
created_by=data.get("created_by"),
created_at=data.get("created_at"),
finished_at=data.get("finished_at"),
)
class WorkflowNodeExecutionTriggeredFrom(Enum):
"""
Workflow Node Execution Triggered From Enum
"""
SINGLE_STEP = "single-step"
WORKFLOW_RUN = "workflow-run"
@classmethod
def value_of(cls, value: str) -> "WorkflowNodeExecutionTriggeredFrom":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow node execution triggered from value {value}")
class WorkflowNodeExecutionStatus(Enum):
"""
Workflow Node Execution Status Enum
"""
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@classmethod
def value_of(cls, value: str) -> "WorkflowNodeExecutionStatus":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow node execution status value {value}")
class WorkflowNodeExecution(db.Model):
"""
Workflow Node Execution
- id (uuid) Execution ID
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- workflow_id (uuid) Workflow ID
- triggered_from (string) Trigger source
`single-step` for single-step debugging
`workflow-run` for workflow execution (debugging / user execution)
- workflow_run_id (uuid) `optional` Workflow run ID
Null for single-step debugging.
- index (int) Execution sequence number, used for displaying Tracing Node order
- predecessor_node_id (string) `optional` Predecessor node ID, used for displaying execution path
- node_id (string) Node ID
- node_type (string) Node type, such as `start`
- title (string) Node title
- inputs (json) All predecessor node variable content used in the node
- process_data (json) Node process data
- outputs (json) `optional` Node output variables
- status (string) Execution status, `running` / `succeeded` / `failed`
- error (string) `optional` Error reason
- elapsed_time (float) `optional` Time consumption (s)
- execution_metadata (text) Metadata
- total_tokens (int) `optional` Total tokens used
- total_price (decimal) `optional` Total cost
- currency (string) `optional` Currency, such as USD / RMB
- created_at (timestamp) Run time
- created_by_role (string) Creator role
- `account` Console account
- `end_user` End user
- created_by (uuid) Runner ID
- finished_at (timestamp) End time
"""
__tablename__ = "workflow_node_executions"
__table_args__ = (
db.PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
db.Index(
"workflow_node_execution_workflow_run_idx",
"tenant_id",
"app_id",
"workflow_id",
"triggered_from",
"workflow_run_id",
),
db.Index(
"workflow_node_execution_node_run_idx", "tenant_id", "app_id", "workflow_id", "triggered_from", "node_id"
),
db.Index(
"workflow_node_execution_id_idx",
"tenant_id",
"app_id",
"workflow_id",
"triggered_from",
"node_execution_id",
),
)
id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id = db.Column(StringUUID, nullable=False)
app_id = db.Column(StringUUID, nullable=False)
workflow_id = db.Column(StringUUID, nullable=False)
triggered_from = db.Column(db.String(255), nullable=False)
workflow_run_id = db.Column(StringUUID)
index = db.Column(db.Integer, nullable=False)
predecessor_node_id = db.Column(db.String(255))
node_execution_id = db.Column(db.String(255), nullable=True)
node_id = db.Column(db.String(255), nullable=False)
node_type = db.Column(db.String(255), nullable=False)
title = db.Column(db.String(255), nullable=False)
inputs = db.Column(db.Text)
process_data = db.Column(db.Text)
outputs = db.Column(db.Text)
status = db.Column(db.String(255), nullable=False)
error = db.Column(db.Text)
elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text("0"))
execution_metadata = db.Column(db.Text)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))
created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False)
finished_at = db.Column(db.DateTime)
@property
def created_by_account(self):
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(Account, self.created_by) if created_by_role == CreatedByRole.ACCOUNT else None
@property
def created_by_end_user(self):
from models.model import EndUser
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatedByRole.END_USER else None
@property
def inputs_dict(self):
return json.loads(self.inputs) if self.inputs else None
@property
def outputs_dict(self):
return json.loads(self.outputs) if self.outputs else None
@property
def process_data_dict(self):
return json.loads(self.process_data) if self.process_data else None
@property
def execution_metadata_dict(self):
return json.loads(self.execution_metadata) if self.execution_metadata else None
@property
def extras(self):
from core.tools.tool_manager import ToolManager
extras = {}
if self.execution_metadata_dict:
from core.workflow.nodes import NodeType
if self.node_type == NodeType.TOOL.value and "tool_info" in self.execution_metadata_dict:
tool_info = self.execution_metadata_dict["tool_info"]
extras["icon"] = ToolManager.get_tool_icon(
tenant_id=self.tenant_id,
provider_type=tool_info["provider_type"],
provider_id=tool_info["provider_id"],
)
return extras
class WorkflowAppLogCreatedFrom(Enum):
"""
Workflow App Log Created From Enum
"""
SERVICE_API = "service-api"
WEB_APP = "web-app"
INSTALLED_APP = "installed-app"
@classmethod
def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow app log created from value {value}")
class WorkflowAppLog(db.Model):
"""
Workflow App execution log, excluding workflow debugging records.
Attributes:
- id (uuid) run ID
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- workflow_id (uuid) Associated Workflow ID
- workflow_run_id (uuid) Associated Workflow Run ID
- created_from (string) Creation source
`service-api` App Execution OpenAPI
`web-app` WebApp
`installed-app` Installed App
- created_by_role (string) Creator role
- `account` Console account
- `end_user` End user
- created_by (uuid) Creator ID, depends on the user table according to created_by_role
- created_at (timestamp) Creation time
"""
__tablename__ = "workflow_app_logs"
__table_args__ = (
db.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"),
db.Index("workflow_app_log_app_idx", "tenant_id", "app_id"),
)
id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id = db.Column(StringUUID, nullable=False)
app_id = db.Column(StringUUID, nullable=False)
workflow_id = db.Column(StringUUID, nullable=False)
workflow_run_id = db.Column(StringUUID, nullable=False)
created_from = db.Column(db.String(255), nullable=False)
created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))
@property
def workflow_run(self):
return db.session.get(WorkflowRun, self.workflow_run_id)
@property
def created_by_account(self):
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(Account, self.created_by) if created_by_role == CreatedByRole.ACCOUNT else None
@property
def created_by_end_user(self):
from models.model import EndUser
created_by_role = CreatedByRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatedByRole.END_USER else None
class ConversationVariable(db.Model):
__tablename__ = "workflow_conversation_variables"
id: Mapped[str] = db.Column(StringUUID, primary_key=True)
conversation_id: Mapped[str] = db.Column(StringUUID, nullable=False, primary_key=True)
app_id: Mapped[str] = db.Column(StringUUID, nullable=False, index=True)
data = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, nullable=False, index=True, server_default=db.text("CURRENT_TIMESTAMP(0)"))
updated_at = db.Column(
db.DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def __init__(self, *, id: str, app_id: str, conversation_id: str, data: str) -> None:
self.id = id
self.app_id = app_id
self.conversation_id = conversation_id
self.data = data
@classmethod
def from_variable(cls, *, app_id: str, conversation_id: str, variable: Variable) -> "ConversationVariable":
obj = cls(
id=variable.id,
app_id=app_id,
conversation_id=conversation_id,
data=variable.model_dump_json(),
)
return obj
def to_variable(self) -> Variable:
mapping = json.loads(self.data)
return variable_factory.build_variable_from_mapping(mapping)