#!/usr/bin/env python # coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import mimetypes import os import re import shutil from typing import Optional import datetime import pytz from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types from smolagents.agents import ActionStep, MultiStepAgent from smolagents.memory import MemoryStep from smolagents.utils import _is_package_available def get_current_time_in_timezone(timezone: str) -> str: """A tool that fetches the current local time in a specified timezone. Args: timezone: A string representing a valid timezone (e.g., 'America/New_York'). """ try: # Create timezone object tz = pytz.timezone(timezone) # Get current time in that timezone local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") return f"The current local time in {timezone} is: {local_time}" except Exception as e: return f"Error fetching time for timezone '{timezone}': {str(e)}" def pull_messages_from_step( step_log: MemoryStep, ): """Extract ChatMessage objects from agent steps with proper nesting""" import gradio as gr if isinstance(step_log, ActionStep): # Output the step number step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else "" yield gr.ChatMessage(role="assistant", content=f"**{step_number}**") # First yield the thought/reasoning from the LLM if hasattr(step_log, "model_output") and step_log.model_output is not None: # Clean up the LLM output model_output = step_log.model_output.strip() # Remove any trailing and extra backticks, handling multiple possible formats model_output = re.sub(r"```\s*", "```", model_output) # handles ``` model_output = re.sub(r"\s*```", "```", model_output) # handles ``` model_output = re.sub(r"```\s*\n\s*", "```", model_output) # handles ```\n model_output = model_output.strip() yield gr.ChatMessage(role="assistant", content=model_output) # For tool calls, create a parent message if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None: first_tool_call = step_log.tool_calls[0] used_code = first_tool_call.name == "python_interpreter" parent_id = f"call_{len(step_log.tool_calls)}" # Tool call becomes the parent message with timing info # First we will handle arguments based on type args = first_tool_call.arguments if isinstance(args, dict): content = str(args.get("answer", str(args))) else: content = str(args).strip() if used_code: # Clean up the content by removing any end code tags content = re.sub(r"```.*?\n", "", content) # Remove existing code blocks content = re.sub(r"\s*\s*", "", content) # Remove end_code tags content = content.strip() if not content.startswith("```python"): content = f"```python\n{content}\n```" parent_message_tool = gr.ChatMessage( role="assistant", content=content, metadata={ "title": f"🛠️ Used tool {first_tool_call.name}", "id": parent_id, "status": "pending", }, ) yield parent_message_tool # Nesting execution logs under the tool call if they exist if hasattr(step_log, "observations") and ( step_log.observations is not None and step_log.observations.strip() ): # Only yield execution logs if there's actual content log_content = step_log.observations.strip() if log_content: log_content = re.sub(r"^Execution logs:\s*", "", log_content) yield gr.ChatMessage( role="assistant", content=f"{log_content}", metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"}, ) # Nesting any errors under the tool call if hasattr(step_log, "error") and step_log.error is not None: yield gr.ChatMessage( role="assistant", content=str(step_log.error), metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"}, ) # Update parent message metadata to done status without yielding a new message parent_message_tool.metadata["status"] = "done" # Handle standalone errors but not from tool calls elif hasattr(step_log, "error") and step_log.error is not None: yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"}) # Calculate duration and token information step_footnote = f"{step_number}" if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"): token_str = ( f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}" ) step_footnote += token_str if hasattr(step_log, "duration"): step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None step_footnote += step_duration step_footnote = f"""{step_footnote} """ yield gr.ChatMessage(role="assistant", content=f"{step_footnote}") yield gr.ChatMessage(role="assistant", content="-----") def stream_to_gradio( agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None, ): """Runs an agent with the given task and streams the messages from the agent as gradio ChatMessages.""" if not _is_package_available("gradio"): raise ModuleNotFoundError( "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`" ) import gradio as gr total_input_tokens = 0 total_output_tokens = 0 for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args): # Track tokens if model provides them if hasattr(agent.model, "last_input_token_count"): total_input_tokens += agent.model.last_input_token_count total_output_tokens += agent.model.last_output_token_count if isinstance(step_log, ActionStep): step_log.input_token_count = agent.model.last_input_token_count step_log.output_token_count = agent.model.last_output_token_count for message in pull_messages_from_step( step_log, ): yield message final_answer = step_log # Last log is the run's final_answer final_answer = handle_agent_output_types(final_answer) if isinstance(final_answer, AgentText): yield gr.ChatMessage( role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}\n", ) elif isinstance(final_answer, AgentImage): yield gr.ChatMessage( role="assistant", content={"path": final_answer.to_string(), "mime_type": "image/png"}, ) elif isinstance(final_answer, AgentAudio): yield gr.ChatMessage( role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"}, ) else: yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}") class GradioUI: """A one-line interface to launch your agent in Gradio""" def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None): if not _is_package_available("gradio"): raise ModuleNotFoundError( "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`" ) self.agent = agent self.file_upload_folder = file_upload_folder if self.file_upload_folder is not None: if not os.path.exists(file_upload_folder): os.mkdir(file_upload_folder) def interact_with_agent(self, prompt, messages): import gradio as gr messages.append(gr.ChatMessage(role="user", content=prompt)) yield messages for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False): messages.append(msg) yield messages yield messages def upload_file( self, file, file_uploads_log, allowed_file_types=[ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", ], ): """ Handle file uploads, default allowed types are .pdf, .docx, and .txt """ import gradio as gr if file is None: return gr.Textbox("No file uploaded", visible=True), file_uploads_log try: mime_type, _ = mimetypes.guess_type(file.name) except Exception as e: return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log if mime_type not in allowed_file_types: return gr.Textbox("File type disallowed", visible=True), file_uploads_log # Sanitize file name original_name = os.path.basename(file.name) sanitized_name = re.sub( r"[^\w\-.]", "_", original_name ) # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores type_to_ext = {} for ext, t in mimetypes.types_map.items(): if t not in type_to_ext: type_to_ext[t] = ext # Ensure the extension correlates to the mime type sanitized_name = sanitized_name.split(".")[:-1] sanitized_name.append("" + type_to_ext[mime_type]) sanitized_name = "".join(sanitized_name) # Save the uploaded file to the specified folder file_path = os.path.join(self.file_upload_folder, os.path.basename(sanitized_name)) shutil.copy(file.name, file_path) return gr.Textbox(f"File uploaded: {file_path}", visible=True), file_uploads_log + [file_path] def log_user_message(self, text_input, file_uploads_log): return ( text_input + ( f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" if len(file_uploads_log) > 0 else "" ), "", ) def agent_get_tools(self): return self.agent.tools def agent_reset(self): self.agent.memory.reset() self.agent.monitor.reset() def agent_set_steps(self, steps): self.agent.max_steps = steps def launch(self, **kwargs): import gradio as gr with gr.Blocks(fill_height=True, theme='crcdng/cyber', css_paths=["cyberpunk.css", "scrolling_text.css"]) as demo: title_html=( """

Your Cyberpunk Local Time Terminal

""" ) description_html=( """

Welcome to ChronoCore-77, the bleeding-edge time terminal jacked straight into the neon veins of Night City. Whether you're dodging corpos, chasing edgerunner gigs, or just trying to sync your implant clock, I've got the local time locked and loaded. No glitches [OK a few...], no lag—just pure, precise chrono-data ripped straight from the grid. Stay sharp, choom. Time waits for no one.

""" ) banner_html=( """
Tannhäuser Gate Approved
""" ) with gr.Row(): title=gr.HTML(banner_html) with gr.Row(): timer = gr.Timer(1) gr.Textbox(render=False) time_display = gr.Textbox(label="Time", elem_classes="cyber-glitch-4") gr.Textbox(render=False) import time timer.tick(lambda: get_current_time_in_timezone(time.tzname[0]), outputs=time_display) with gr.Row(): title=gr.HTML(title_html) with gr.Row(): description=gr.HTML(description_html) stored_messages = gr.State([]) file_uploads_log = gr.State([]) chatbot = gr.Chatbot( label="Agent", type="messages", avatar_images=( "https://huggingface.co/spaces/crcdng/First_agent_template/resolve/main/agent_b.jpg", "https://huggingface.co/spaces/crcdng/First_agent_template/resolve/main/agent_a.jpg", ), resizeable=True, scale=2, elem_classes="cyber-glitch-1", ) # If an upload folder is provided, enable the upload feature if self.file_upload_folder is not None: upload_file = gr.File(label="Upload a file") upload_status = gr.Textbox(label="Upload Status", interactive=False, visible=False) upload_file.change( self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log], ) with gr.Row(equal_height=True): steps_input = gr.Slider(1, 12, value=4, step=1, label="Max. Number of Steps") steps_input.change(self.agent_set_steps, steps_input, None) tools_list = gr.Dropdown(self.agent_get_tools(), interactive=True, label="Tools", info="(display only)") # tools_list.select(self.agent_get_tools, None, tools_list) reset = gr.Button(value="Reset Agent") reset.click(self.agent_reset, None, None) text_input = gr.Textbox(lines=1, label="Chat Message", elem_classes="cyber-glitch-2", max_length=1000) text_input.submit( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input], ).then(self.interact_with_agent, [stored_messages, chatbot], [chatbot]) examples = gr.Examples( examples=[["Tell me a joke based on the current local time"],["Given the current local time, what is a fun activity to do?"],["When asked for the current local time, add 6 hours to it. What is the current local time?"], ["Find significant events that happend exactly one year ago"], ["Compute the current local time glitch coeficients"],["Generate a bold picture inspired by the current local time"]], inputs=[text_input], ) demo.launch(debug=True, share=True, ssr_mode=False, allowed_paths=["Cyberpunk.otf"], **kwargs) __all__ = ["stream_to_gradio", "GradioUI"]