|
from collections.abc import Generator, Mapping |
|
from typing import Any, Union |
|
|
|
from openai._exceptions import RateLimitError |
|
|
|
from configs import dify_config |
|
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator |
|
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator |
|
from core.app.apps.chat.app_generator import ChatAppGenerator |
|
from core.app.apps.completion.app_generator import CompletionAppGenerator |
|
from core.app.apps.workflow.app_generator import WorkflowAppGenerator |
|
from core.app.entities.app_invoke_entities import InvokeFrom |
|
from core.app.features.rate_limiting import RateLimit |
|
from models.model import Account, App, AppMode, EndUser |
|
from models.workflow import Workflow |
|
from services.errors.llm import InvokeRateLimitError |
|
from services.workflow_service import WorkflowService |
|
|
|
|
|
class AppGenerateService: |
|
@classmethod |
|
def generate( |
|
cls, |
|
app_model: App, |
|
user: Union[Account, EndUser], |
|
args: Mapping[str, Any], |
|
invoke_from: InvokeFrom, |
|
streaming: bool = True, |
|
): |
|
""" |
|
App Content Generate |
|
:param app_model: app model |
|
:param user: user |
|
:param args: args |
|
:param invoke_from: invoke from |
|
:param streaming: streaming |
|
:return: |
|
""" |
|
max_active_request = AppGenerateService._get_max_active_requests(app_model) |
|
rate_limit = RateLimit(app_model.id, max_active_request) |
|
request_id = RateLimit.gen_request_key() |
|
try: |
|
request_id = rate_limit.enter(request_id) |
|
if app_model.mode == AppMode.COMPLETION.value: |
|
return rate_limit.generate( |
|
CompletionAppGenerator().generate( |
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
|
), |
|
request_id, |
|
) |
|
elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: |
|
return rate_limit.generate( |
|
AgentChatAppGenerator().generate( |
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
|
), |
|
request_id, |
|
) |
|
elif app_model.mode == AppMode.CHAT.value: |
|
return rate_limit.generate( |
|
ChatAppGenerator().generate( |
|
app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
|
), |
|
request_id, |
|
) |
|
elif app_model.mode == AppMode.ADVANCED_CHAT.value: |
|
workflow = cls._get_workflow(app_model, invoke_from) |
|
return rate_limit.generate( |
|
AdvancedChatAppGenerator().generate( |
|
app_model=app_model, |
|
workflow=workflow, |
|
user=user, |
|
args=args, |
|
invoke_from=invoke_from, |
|
stream=streaming, |
|
), |
|
request_id, |
|
) |
|
elif app_model.mode == AppMode.WORKFLOW.value: |
|
workflow = cls._get_workflow(app_model, invoke_from) |
|
return rate_limit.generate( |
|
WorkflowAppGenerator().generate( |
|
app_model=app_model, |
|
workflow=workflow, |
|
user=user, |
|
args=args, |
|
invoke_from=invoke_from, |
|
stream=streaming, |
|
), |
|
request_id, |
|
) |
|
else: |
|
raise ValueError(f"Invalid app mode {app_model.mode}") |
|
except RateLimitError as e: |
|
raise InvokeRateLimitError(str(e)) |
|
finally: |
|
if not streaming: |
|
rate_limit.exit(request_id) |
|
|
|
@staticmethod |
|
def _get_max_active_requests(app_model: App) -> int: |
|
max_active_requests = app_model.max_active_requests |
|
if app_model.max_active_requests is None: |
|
max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS) |
|
return max_active_requests |
|
|
|
@classmethod |
|
def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): |
|
if app_model.mode == AppMode.ADVANCED_CHAT.value: |
|
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
|
return AdvancedChatAppGenerator().single_iteration_generate( |
|
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
|
) |
|
elif app_model.mode == AppMode.WORKFLOW.value: |
|
workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
|
return WorkflowAppGenerator().single_iteration_generate( |
|
app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
|
) |
|
else: |
|
raise ValueError(f"Invalid app mode {app_model.mode}") |
|
|
|
@classmethod |
|
def generate_more_like_this( |
|
cls, |
|
app_model: App, |
|
user: Union[Account, EndUser], |
|
message_id: str, |
|
invoke_from: InvokeFrom, |
|
streaming: bool = True, |
|
) -> Union[dict, Generator]: |
|
""" |
|
Generate more like this |
|
:param app_model: app model |
|
:param user: user |
|
:param message_id: message id |
|
:param invoke_from: invoke from |
|
:param streaming: streaming |
|
:return: |
|
""" |
|
return CompletionAppGenerator().generate_more_like_this( |
|
app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming |
|
) |
|
|
|
@classmethod |
|
def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow: |
|
""" |
|
Get workflow |
|
:param app_model: app model |
|
:param invoke_from: invoke from |
|
:return: |
|
""" |
|
workflow_service = WorkflowService() |
|
if invoke_from == InvokeFrom.DEBUGGER: |
|
|
|
workflow = workflow_service.get_draft_workflow(app_model=app_model) |
|
|
|
if not workflow: |
|
raise ValueError("Workflow not initialized") |
|
else: |
|
|
|
workflow = workflow_service.get_published_workflow(app_model=app_model) |
|
|
|
if not workflow: |
|
raise ValueError("Workflow not published") |
|
|
|
return workflow |
|
|