from logger_config import setup_logger from typing import Dict, Any, Optional, List, Union from dataclasses import dataclass, asdict from enum import Enum import json from dify_client_python.dify_client.models.stream import ( StreamEvent, StreamResponse, build_chat_stream_response ) import re logger = setup_logger() class EventType(Enum): AGENT_THOUGHT = "agent_thought" AGENT_MESSAGE = "agent_message" MESSAGE_END = "message_end" PING = "ping" @dataclass class ToolCall: tool_name: str tool_input: Dict[str, Any] tool_output: Optional[str] tool_labels: Dict[str, Dict[str, str]] @dataclass class Citation: dataset_id: str dataset_name: str document_id: str document_name: str segment_id: str score: float content: str @dataclass class ProcessedResponse: event_type: EventType task_id: str message_id: str conversation_id: str content: str tool_calls: List[ToolCall] citations: List[Citation] metadata: Dict[str, Any] created_at: int class EnumEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Enum): return obj.value if hasattr(obj, 'dict'): return obj.dict() return super().default(obj) class SSEParser: def __init__(self): self.logger = setup_logger("sse_parser") def parse_sse_event(self, data: str) -> Optional[Dict]: """Parse SSE event data and return cleaned dictionary""" self.logger.debug("Parsing SSE event") try: # Extract the data portion if "data:" in data: data = data.split("data:", 1)[1].strip() # Parse JSON data parsed_data = json.loads(data) # Clean tool outputs if present if "observation" in parsed_data: try: observation = parsed_data["observation"] if observation and isinstance(observation, str): tool_data = json.loads(observation) # Extract relevant tool output for key, value in tool_data.items(): if isinstance(value, str) and "llm_result" in value: tool_result = json.loads(value)["llm_result"] parsed_data["observation"] = self.clean_tool_output(tool_result) except: pass # Keep original observation if parsing fails return parsed_data except json.JSONDecodeError as e: self.logger.error(f"JSON decode error: {str(e)}") return None except Exception as e: self.logger.error(f"Parse error: {str(e)}") return None def clean_tool_output(self, output: str) -> str: """Clean tool output by removing markdown and other formatting""" # Remove markdown code blocks output = re.sub(r'```.*?```', '', output, flags=re.DOTALL) # Remove other markdown formatting output = re.sub(r'[*_`#]', '', output) # Clean up whitespace output = re.sub(r'\n{3,}', '\n\n', output.strip()) return output