Severian commited on
Commit
c5a4eb2
·
verified ·
1 Parent(s): 5dd3a67

Update api.py

Browse files
Files changed (1) hide show
  1. api.py +126 -48
api.py CHANGED
@@ -119,6 +119,7 @@ class AgentProcessor:
119
  parser = SSEParser()
120
  citations = []
121
  metadata = {}
 
122
 
123
  try:
124
  async with self.client.stream(
@@ -145,72 +146,93 @@ class AgentProcessor:
145
  data = line.split("data:", 1)[1].strip()
146
  parsed = json.loads(data)
147
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  if parsed.get("event") == "message_end":
149
  citations = parsed.get("retriever_resources", [])
150
  metadata = parsed.get("metadata", {})
 
151
  self.logger.debug(
152
  f"Message end event:\n"
153
  f"Citations: {citations}\n"
154
- f"Metadata: {metadata}"
 
155
  )
156
 
157
  formatted = self.format_terminal_output(
158
  parsed,
159
  citations=citations,
160
- metadata=metadata
 
161
  )
162
  if formatted:
163
  self.logger.info(formatted)
 
 
164
  except Exception as e:
165
  await self.log_error(
166
  e,
167
  {"line": line, "event": "parse_data"}
168
  )
169
-
170
- buffer += line + "\n"
171
-
172
- if line.startswith("data:") or buffer.strip().endswith("}"):
173
- try:
174
- processed_response = parser.parse_sse_event(buffer)
175
- if processed_response and isinstance(processed_response, dict):
176
- cleaned_response = self.clean_response(processed_response)
177
- if cleaned_response:
178
- xml_content = cleaned_response.get("content", "")
179
- yield f"data: {xml_content}\n\n"
180
- except Exception as parse_error:
181
- await self.log_error(
182
- parse_error,
183
- {"buffer": buffer, "event": "process_buffer"}
184
- )
185
- error_xml = (
186
- f"<agent_response>"
187
- f"<error>{str(parse_error)}</error>"
188
- f"</agent_response>"
189
- )
190
- yield f"data: {error_xml}\n\n"
191
- finally:
192
- buffer = ""
193
-
194
- except httpx.ConnectError as e:
195
- await self.log_error(e, {"event": "connection_error"})
196
- error_xml = (
197
- f"<agent_response>"
198
- f"<error>Connection error: {str(e)}</error>"
199
- f"</agent_response>"
200
- )
201
- yield f"data: {error_xml}\n\n"
202
  except Exception as e:
203
- await self.log_error(e, {"event": "stream_error"})
204
- error_xml = (
205
- f"<agent_response>"
206
- f"<error>Streaming error: {str(e)}</error>"
207
- f"</agent_response>"
208
- )
209
- yield f"data: {error_xml}\n\n"
210
- finally:
211
- end_time = datetime.now()
212
- duration = (end_time - start_time).total_seconds()
213
- self.logger.info(f"Request completed in {duration:.2f} seconds")
214
 
215
  return StreamingResponse(
216
  event_generator(),
@@ -227,7 +249,8 @@ class AgentProcessor:
227
  self,
228
  response: Dict,
229
  citations: List[Dict] = None,
230
- metadata: Dict = None
 
231
  ) -> Optional[str]:
232
  """Format response for terminal output"""
233
  event_type = response.get("event")
@@ -239,7 +262,8 @@ class AgentProcessor:
239
  thought,
240
  observation,
241
  citations=citations,
242
- metadata=metadata
 
243
  )
244
  return terminal_output
245
 
@@ -266,6 +290,60 @@ class AgentProcessor:
266
  if event_type == "agent_thought":
267
  thought = response.get("thought", "")
268
  observation = response.get("observation", "")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
269
  _, xml_output = self.formatter.format_thought(thought, observation)
270
  return {
271
  "type": "thought",
 
119
  parser = SSEParser()
120
  citations = []
121
  metadata = {}
122
+ tool_outputs = []
123
 
124
  try:
125
  async with self.client.stream(
 
146
  data = line.split("data:", 1)[1].strip()
147
  parsed = json.loads(data)
148
 
149
+ # Enhanced mermaid diagram handling
150
+ if parsed.get("observation"):
151
+ try:
152
+ observation = parsed["observation"]
153
+ if isinstance(observation, str):
154
+ if "mermaid_diagram" in observation:
155
+ try:
156
+ # Clean and extract diagram content
157
+ cleaned_content = parser.clean_mermaid_content(
158
+ observation
159
+ )
160
+
161
+ # Create tool output without extra wrapping
162
+ tool_output = {
163
+ "type": "mermaid_diagram",
164
+ "content": cleaned_content
165
+ }
166
+ tool_outputs.append(tool_output)
167
+
168
+ # Send clean event
169
+ yield (
170
+ "event: tool_output\n"
171
+ f"data: {json.dumps(tool_output)}\n\n"
172
+ )
173
+ except Exception as e:
174
+ self.logger.error(
175
+ f"Failed to process mermaid diagram: {e}"
176
+ )
177
+ except Exception as e:
178
+ self.logger.error(
179
+ f"Error processing observation: {e}"
180
+ )
181
+
182
  if parsed.get("event") == "message_end":
183
  citations = parsed.get("retriever_resources", [])
184
  metadata = parsed.get("metadata", {})
185
+ metadata["tool_outputs"] = tool_outputs
186
  self.logger.debug(
187
  f"Message end event:\n"
188
  f"Citations: {citations}\n"
189
+ f"Metadata: {metadata}\n"
190
+ f"Tool outputs: {tool_outputs}"
191
  )
192
 
193
  formatted = self.format_terminal_output(
194
  parsed,
195
  citations=citations,
196
+ metadata=metadata,
197
+ tool_outputs=tool_outputs
198
  )
199
  if formatted:
200
  self.logger.info(formatted)
201
+ yield f"data: {formatted}\n\n"
202
+
203
  except Exception as e:
204
  await self.log_error(
205
  e,
206
  {"line": line, "event": "parse_data"}
207
  )
208
+
209
+ buffer += line + "\n"
210
+
211
+ if line.startswith("data:") or buffer.strip().endswith("}"):
212
+ try:
213
+ processed_response = parser.parse_sse_event(buffer)
214
+ if processed_response and isinstance(processed_response, dict):
215
+ cleaned_response = self.clean_response(processed_response)
216
+ if cleaned_response:
217
+ xml_content = cleaned_response.get("content", "")
218
+ yield f"data: {xml_content}\n\n"
219
+ except Exception as parse_error:
220
+ await self.log_error(
221
+ parse_error,
222
+ {"buffer": buffer, "event": "process_buffer"}
223
+ )
224
+ error_xml = (
225
+ f"<agent_response>"
226
+ f"<error>{str(parse_error)}</error>"
227
+ f"</agent_response>"
228
+ )
229
+ yield f"data: {error_xml}\n\n"
230
+ finally:
231
+ buffer = ""
232
+
 
 
 
 
 
 
 
 
233
  except Exception as e:
234
+ self.logger.error(f"Stream processing error: {str(e)}")
235
+ yield f"data: {{'error': '{str(e)}'}}\n\n"
 
 
 
 
 
 
 
 
 
236
 
237
  return StreamingResponse(
238
  event_generator(),
 
249
  self,
250
  response: Dict,
251
  citations: List[Dict] = None,
252
+ metadata: Dict = None,
253
+ tool_outputs: List[Dict] = None
254
  ) -> Optional[str]:
255
  """Format response for terminal output"""
256
  event_type = response.get("event")
 
262
  thought,
263
  observation,
264
  citations=citations,
265
+ metadata=metadata,
266
+ tool_outputs=tool_outputs
267
  )
268
  return terminal_output
269
 
 
290
  if event_type == "agent_thought":
291
  thought = response.get("thought", "")
292
  observation = response.get("observation", "")
293
+ tool = response.get("tool", "")
294
+
295
+ # Handle mermaid diagram observations
296
+ if tool == "mermaid_diagram" and observation:
297
+ try:
298
+ # First check if observation is error message
299
+ if isinstance(observation, str):
300
+ obs_data = json.loads(observation)
301
+ if "mermaid_diagram" in obs_data:
302
+ if obs_data["mermaid_diagram"].startswith("tool invoke error"):
303
+ self.logger.warning(
304
+ f"Mermaid diagram tool error: {obs_data['mermaid_diagram']}"
305
+ )
306
+ return None
307
+
308
+ # Handle successful mermaid diagram
309
+ if isinstance(observation, dict):
310
+ mermaid_data = observation.get("mermaid_diagram", "")
311
+ else:
312
+ obs_data = json.loads(observation)
313
+ mermaid_data = obs_data.get("mermaid_diagram", "")
314
+
315
+ if mermaid_data:
316
+ # Handle nested JSON structure
317
+ if isinstance(mermaid_data, str):
318
+ mermaid_data = json.loads(mermaid_data)
319
+
320
+ # Extract diagram from either format
321
+ if isinstance(mermaid_data, dict):
322
+ diagram = mermaid_data.get("mermaid_diagram", "")
323
+ else:
324
+ diagram = mermaid_data
325
+
326
+ # Clean up the diagram code
327
+ if isinstance(diagram, str):
328
+ if "tool response:" in diagram:
329
+ diagram = diagram.split("tool response:")[0]
330
+ if diagram.startswith('{"mermaid_diagram": "'):
331
+ diagram = json.loads(diagram)["mermaid_diagram"]
332
+ if diagram.startswith("```mermaid\n"):
333
+ diagram = diagram[10:]
334
+ if diagram.endswith("\n```"):
335
+ diagram = diagram[:-4]
336
+
337
+ return {
338
+ "type": "mermaid_diagram",
339
+ "content": diagram.strip()
340
+ }
341
+ except (json.JSONDecodeError, KeyError) as e:
342
+ self.logger.error(f"Failed to parse mermaid diagram data: {str(e)}")
343
+ self.logger.debug(f"Raw observation: {observation}")
344
+ return None
345
+
346
+ # Handle regular thought
347
  _, xml_output = self.formatter.format_thought(thought, observation)
348
  return {
349
  "type": "thought",