Spaces:
Sleeping
Sleeping
import platform | |
import streamlit as st | |
import psutil | |
from typing import List, Dict, Optional, Any, Tuple | |
from dataclasses import dataclass | |
from enum import Enum | |
import logging | |
import time | |
import ast | |
import pylint.lint | |
import radon.complexity | |
import radon.metrics | |
from pylint.lint import Run | |
from pylint.reporters import JSONReporter | |
from coverage import Coverage | |
import bandit | |
from bandit.core import manager | |
from datetime import datetime | |
import os | |
import sys | |
import requests | |
import asyncio | |
import statistics | |
import json | |
import traceback | |
from pathlib import Path | |
try: | |
from pylint.reporters import JSONReporter | |
from bandit.core import manager | |
except ImportError as e: | |
st.error(f"Missing dependency: {str(e)}") | |
st.stop() | |
class PipelineStage(Enum): | |
"""Pipeline stages for the development process.""" | |
PLANNING = 1 | |
DEVELOPMENT = 2 | |
TESTING = 3 | |
# Configure basic logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
class AutonomousAgentApp: | |
def __init__(self): | |
self.autonomous_agent = None | |
self.interface = None | |
self._initialize_components() | |
def _initialize_components(self): | |
"""Initialize components in the correct order.""" | |
self.autonomous_agent = AutonomousAgent(self) | |
self.interface = StreamlitInterface(self.autonomous_agent) | |
def run(self): | |
"""Main entry point for the application""" | |
self.interface.render_main_interface() | |
class CodeMetricsAnalyzer: | |
"""Analyzes code metrics using various tools""" | |
def __init__(self): | |
self.metrics_history = [] | |
def analyze_code_quality(self, file_path: str) -> Dict[str, Any]: | |
"""Analyzes code quality using multiple metrics""" | |
try: | |
# Pylint analysis | |
pylint_score = self._run_pylint(file_path) | |
# Complexity analysis | |
complexity_score = self._analyze_complexity(file_path) | |
# Test coverage analysis | |
coverage_score = self._analyze_test_coverage(file_path) | |
# Security analysis | |
security_score = self._analyze_security(file_path) | |
# Calculate overall quality score | |
quality_score = self._calculate_overall_score( | |
pylint_score, | |
complexity_score, | |
coverage_score, | |
security_score | |
) | |
metrics = { | |
"quality_score": quality_score, | |
"pylint_score": pylint_score, | |
"complexity_score": complexity_score, | |
"coverage_score": coverage_score, | |
"security_score": security_score, | |
"timestamp": datetime.now() | |
} | |
self.metrics_history.append(metrics) | |
return metrics | |
except Exception as e: | |
logging.error(f"Error analyzing code metrics: {str(e)}") | |
return { | |
"error": str(e), | |
"quality_score": 0.0, | |
"timestamp": datetime.now() | |
} | |
def _run_pylint(self, file_path: str) -> float: | |
"""Runs pylint analysis""" | |
try: | |
reporter = JSONReporter() | |
Run([file_path], reporter=reporter, do_exit=False) | |
score = reporter.data.get('score', 0.0) | |
return float(score) / 10.0 # Normalize to 0-1 scale | |
except Exception as e: | |
logging.error(f"Pylint analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_complexity(self, file_path: str) -> float: | |
"""Analyzes code complexity""" | |
try: | |
with open(file_path, 'r') as file: | |
code = file.read() | |
# Calculate cyclomatic complexity | |
complexity = radon.complexity.cc_visit(code) | |
avg_complexity = sum(item.complexity for item in complexity) / len(complexity) if complexity else 0 | |
# Normalize complexity score (0-1 scale, lower is better) | |
normalized_score = 1.0 - min(avg_complexity / 10.0, 1.0) | |
return normalized_score | |
except Exception as e: | |
logging.error(f"Complexity analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_security(self, file_path: str) -> float: | |
"""Analyzes code security using bandit""" | |
try: | |
conf = manager.BanditManager() | |
conf.discover_files([file_path]) | |
conf.run_tests() | |
# Calculate security score based on findings | |
total_issues = len(conf.get_issue_list()) | |
max_severity = max((issue.severity for issue in conf.get_issue_list()), default=0) | |
# Normalize security score (0-1 scale, higher is better) | |
security_score = 1.0 - (total_issues * max_severity) / 10.0 | |
return max(0.0, min(1.0, security_score)) | |
except Exception as e: | |
logging.error(f"Security analysis error: {str(e)}") | |
return 0.0 | |
def _calculate_overall_score(self, pylint_score: float, complexity_score: float, | |
coverage_score: float, security_score: float) -> float: | |
"""Calculates overall code quality score""" | |
weights = { | |
'pylint': 0.3, | |
'complexity': 0.2, | |
'coverage': 0.25, | |
'security': 0.25 | |
} | |
overall_score = ( | |
weights['pylint'] * pylint_score + | |
weights['complexity'] * complexity_score + | |
weights['coverage'] * coverage_score + | |
weights['security'] * security_score | |
) | |
return max(0.0, min(1.0, overall_score)) | |
def get_metrics_history(self) -> List[Dict[str, Any]]: | |
"""Returns the history of metrics measurements""" | |
return self.metrics_history | |
def get_trend_analysis(self) -> Dict[str, Any]: | |
"""Analyzes trends in metrics over time""" | |
if not self.metrics_history: | |
return {"status": "No metrics history available"} | |
trends = { | |
"quality_score": self._calculate_trend([m["quality_score"] for m in self.metrics_history]), | |
"coverage_score": self._calculate_trend([m["coverage_score"] for m in self.metrics_history]), | |
"security_score": self._calculate_trend([m["security_score"] for m in self.metrics_history]) | |
} | |
return trends | |
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: | |
"""Calculates trend statistics for a metric""" | |
if not values: | |
return {"trend": "unknown", "change": 0.0} | |
recent_values = values[-3:] # Look at last 3 measurements | |
if len(recent_values) < 2: | |
return {"trend": "insufficient data", "change": 0.0} | |
change = recent_values[-1] - recent_values[0] | |
trend = "improving" if change > 0 else "declining" if change < 0 else "stable" | |
return { | |
"trend": trend, | |
"change": change, | |
"current": recent_values[-1], | |
"previous": recent_values[0] | |
} | |
class WorkspaceManager: | |
"""Manages the workspace for the Autonomous Agent System.""" | |
def __init__(self, workspace_dir: str): | |
self.workspace_dir = workspace_dir | |
def get_workspace_tree(self) -> Dict[str, Any]: | |
"""Get the structure of the workspace.""" | |
# Placeholder implementation | |
return {"workspace": "tree_structure"} | |
def create_file(self, filename: str, content: str) -> str: | |
"""Create a new file in the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
with open(file_path, 'w') as file: | |
file.write(content) | |
return f"File '{filename}' created successfully." | |
def delete_file(self, filename: str) -> str: | |
"""Delete a file from the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return f"File '{filename}' deleted successfully." | |
return f"File '{filename}' not found." | |
class ToolManager: | |
"""Manages tools for the autonomous agent system.""" | |
def __init__(self): | |
self.tools = {} | |
def add_tool(self, tool_name, tool_config): | |
"""Add a tool to the tool manager.""" | |
self.tools[tool_name] = tool_config | |
def get_tool(self, tool_name): | |
"""Get a tool from the tool manager.""" | |
return self.tools.get(tool_name) | |
def remove_tool(self, tool_name): | |
"""Remove a tool from the tool manager.""" | |
if tool_name in self.tools: | |
del self.tools[tool_name] | |
class QualityMetrics: | |
"""Advanced quality metrics tracking and analysis""" | |
def __init__(self): | |
self.metrics_analyzer = CodeMetricsAnalyzer() | |
self.code_quality_score = 0.0 | |
self.test_coverage = 0.0 | |
self.security_score = "unknown" | |
self.performance_score = 0.0 | |
self.history = [] | |
self.thresholds = { | |
"code_quality": 0.85, | |
"test_coverage": 0.90, | |
"security": 0.85, | |
"performance": 0.80 | |
} | |
class ToolRepository: | |
"""Repository for managing tools and their configurations""" | |
def __init__(self): | |
self.tools = {} | |
self.default_tools = { | |
'code_analyzer': { | |
'name': 'Code Analyzer', | |
'type': 'analysis', | |
'config': {'enabled': True} | |
}, | |
'test_runner': { | |
'name': 'Test Runner', | |
'type': 'testing', | |
'config': {'enabled': True} | |
}, | |
'security_scanner': { | |
'name': 'Security Scanner', | |
'type': 'security', | |
'config': {'enabled': True} | |
} | |
} | |
self._initialize_default_tools() | |
def _initialize_default_tools(self): | |
"""Initialize the repository with default tools.""" | |
self.tools.update(self.default_tools) | |
def get_tool(self, tool_name: str) -> Optional[Dict]: | |
"""Get a tool by name.""" | |
return self.tools.get(tool_name) | |
def add_tool(self, tool_name: str, tool_config: Dict): | |
"""Add a new tool to the repository.""" | |
self.tools[tool_name] = tool_config | |
def remove_tool(self, tool_name: str): | |
"""Remove a tool from the repository.""" | |
if tool_name in self.tools: | |
del self.tools[tool_name] | |
def list_tools(self) -> List[str]: | |
"""List all available tools.""" | |
return list(self.tools.keys()) | |
def get_tools_by_type(self, tool_type: str) -> List[Dict]: | |
"""Get all tools of a specific type.""" | |
return [ | |
tool for tool in self.tools.values() | |
if tool.get('type') == tool_type | |
] | |
class AutonomousAgent: | |
"""Autonomous agent for the system.""" | |
def __init__(self, app): | |
self.app = app | |
# Initialize components in order | |
self.workspace_manager = WorkspaceManager(workspace_dir=os.getenv('WORKSPACE_DIR', 'workspace')) | |
self.tool_manager = ToolManager() # Simplified initialization | |
self.tools_repository = ToolRepository() | |
self.pipeline = DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self.tool_manager | |
) | |
self.refinement_loop = RefinementLoop(pipeline=self.pipeline) | |
self.chat_system = ChatSystem(self) | |
def run(self): | |
"""Run the Streamlit application.""" | |
self.interface.render_main_interface() | |
def _initialize_tool_repository(self) -> ToolRepository: | |
"""Initialize the tool repository.""" | |
repository = ToolRepository() | |
# Add any additional tool configurations here | |
repository.add_tool('custom_analyzer', { | |
'name': 'Custom Code Analyzer', | |
'type': 'analysis', | |
'config': { | |
'enabled': True, | |
'custom_rules': [] | |
} | |
}) | |
return repository | |
def _setup_tool_manager(self) -> ToolManager: | |
"""Setup tool manager with configuration.""" | |
return ToolManager() | |
def _initialize_pipeline(self) -> 'DevelopmentPipeline': | |
"""Initialize the development pipeline.""" | |
return DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self.tool_manager | |
) | |
def get_tool(self, tool_name: str) -> Optional[Dict]: | |
"""Get a tool configuration by name.""" | |
return self.tools_repository.get_tool(tool_name) | |
def add_tool(self, tool_name: str, tool_config: Dict): | |
"""Add a new tool to the repository.""" | |
self.tools_repository.add_tool(tool_name, tool_config) | |
def remove_tool(self, tool_name: str): | |
"""Remove a tool from the repository.""" | |
self.tools_repository.remove_tool(tool_name) | |
def list_available_tools(self) -> List[str]: | |
"""List all available tools.""" | |
return self.tools_repository.list_tools() | |
def get_tools_by_type(self, tool_type: str) -> List[Dict]: | |
"""Get all tools of a specific type.""" | |
class DevelopmentPipeline: | |
def __init__(self, workspace_manager, tool_manager): | |
self.workspace_manager = workspace_manager | |
self.tool_manager = tool_manager | |
self.current_stage = None | |
self.stage_results = {} | |
self.metrics = {} | |
# Initialize logger properly | |
self.logger = logging.getLogger('development_pipeline') | |
if not self.logger.handlers: | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
self.logger.addHandler(handler) | |
self.logger.setLevel(logging.INFO) | |
async def execute_stage(self, stage: PipelineStage, input_data: Dict) -> Dict[str, Any]: | |
"""Execute a pipeline stage and return results.""" | |
self.logger.info(f"Executing pipeline stage: {stage.value}") | |
try: | |
if stage == PipelineStage.PLANNING: | |
return await self._handle_planning(input_data) | |
elif stage == PipelineStage.DEVELOPMENT: | |
return await self._handle_development(input_data) | |
elif stage == PipelineStage.TESTING: | |
return await self._handle_testing(input_data) | |
else: | |
raise ValueError(f"Unknown pipeline stage: {stage}") | |
except Exception as e: | |
self.logger.error(f"Error in {stage.value} stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
async def _handle_planning(self, input_data: Dict) -> Dict: | |
"""Handle planning stage execution.""" | |
self.logger.info("Handling planning stage") | |
try: | |
task = input_data.get("task", "") | |
if not task: | |
raise ValueError("No task provided for planning") | |
# Step 1: Analyze the task and break it into subtasks | |
subtasks = self._break_down_task(task) | |
# Step 2: Generate a development plan | |
development_plan = { | |
"task": task, | |
"subtasks": subtasks, | |
"milestones": self._define_milestones(subtasks), | |
"timeline": self._estimate_timeline(subtasks) | |
} | |
# Step 3: Create initial project artifacts (e.g., requirements.txt) | |
self.workspace_manager.create_file("requirements.txt", self._generate_requirements(subtasks)) | |
return { | |
"status": "success", | |
"result": {"plan": development_plan}, | |
"artifacts": ["requirements.txt"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in planning stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _break_down_task(self, task: str) -> List[str]: | |
"""Break down a task into smaller subtasks.""" | |
return [f"Subtask {i+1}: {part}" for i, part in enumerate(task.split(","))] | |
def _define_milestones(self, subtasks: List[str]) -> List[str]: | |
"""Define milestones based on subtasks.""" | |
return [f"Complete {subtask}" for subtask in subtasks] | |
def _estimate_timeline(self, subtasks: List[str]) -> Dict[str, int]: | |
"""Estimate a timeline for the subtasks.""" | |
return {subtask: 1 for subtask in subtasks} | |
def _generate_requirements(self, subtasks: List[str]) -> str: | |
"""Generate a requirements document based on subtasks.""" | |
return "\n".join([f"Requirement: {subtask}" for subtask in subtasks]) | |
async def _handle_development(self, input_data: Dict) -> Dict: | |
"""Handle development stage execution.""" | |
self.logger.info("Handling development stage") | |
try: | |
plan = input_data.get("result", {}).get("plan", {}) | |
if not plan: | |
raise ValueError("No development plan provided") | |
# Step 1: Generate boilerplate code | |
self.workspace_manager.create_file("main.py", self._generate_boilerplate_code(plan)) | |
# Step 2: Implement functionality for each subtask | |
for subtask in plan.get("subtasks", []): | |
self._implement_subtask(subtask) | |
return { | |
"status": "success", | |
"result": {"code": "print('Hello World')"}, | |
"artifacts": ["main.py"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in development stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _generate_boilerplate_code(self, plan: Dict) -> str: | |
"""Generated boilerplate code based on the development plan.""" | |
return """f"# Project: {plan.get('task', 'Untitled')} | |
# Subtasks: | |
{''.join([f'# {subtask} for subtask in plan.get('subtasks', [])])} | |
def main(): | |
print('Hello World') | |
if __name__ == '__main__': | |
main()""""" | |
def _implement_subtask(self, subtask: str) -> None: | |
"""Implement functionality for a subtask.""" | |
with open(os.path.join(self.workspace_manager.workspace_dir, "main.py"), "a") as file: | |
file.write(f"\n# Implementation for {subtask}\n") | |
async def _handle_testing(self, input_data: Dict) -> Dict: | |
"""Handle testing stage execution.""" | |
self.logger.info("Handling testing stage") | |
try: | |
code_path = os.path.join(self.workspace_manager.workspace_dir, "main.py") | |
if not os.path.exists(code_path): | |
raise FileNotFoundError("No code found for testing") | |
# Step 1: Run unit tests | |
test_results = self._run_unit_tests(code_path) | |
# Step 2: Generate a test report | |
test_report = self._generate_test_report(test_results) | |
self.workspace_manager.create_file("test_report.html", test_report) | |
return { | |
"status": "success", | |
"result": {"test_results": test_results}, | |
"artifacts": ["test_report.html"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in testing stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _run_unit_tests(self, code_path: str) -> Dict[str, Any]: | |
"""Run unit tests on the code.""" | |
return { | |
"tests_run": 5, | |
"tests_passed": 5, | |
"tests_failed": 0, | |
"coverage": "100%" | |
} | |
def _generate_test_report(self, test_results: Dict) -> str: | |
"""Generate an HTML test report.""" | |
return f""" | |
<html> | |
<head><title>Test Report</title></head> | |
<body> | |
<h1>Test Report</h1> | |
<ul> | |
<li>Tests Run: {test_results.get('tests_run', 0)}</li> | |
<li>Tests Passed: {test_results.get('tests_passed', 0)}</li> | |
<li>Tests Failed: {test_results.get('tests_failed', 0)}</li> | |
<li>Coverage: {test_results.get('coverage', '0%')}</li> | |
</ul> | |
</body> | |
</html> | |
""" | |
class RefinementLoop: | |
"""Manages the iterative refinement process.""" | |
def __init__(self, pipeline): | |
self.pipeline = pipeline | |
self.max_iterations = 10 | |
self.quality_metrics = QualityMetrics() | |
self.logger = logging.getLogger(__name__) | |
self.current_iteration = 0 | |
self.history = [] | |
async def run_refinement_cycle(self, task: str) -> Dict[str, Any]: | |
"""Run a complete refinement cycle for the given task.""" | |
self.logger.info(f"Starting refinement cycle for task: {task}") | |
self.current_iteration = 0 | |
try: | |
while self.current_iteration < self.max_iterations: | |
self.logger.info(f"Starting iteration {self.current_iteration + 1}") | |
# Execute pipeline stages | |
planning_result = await self.pipeline.execute_stage( | |
PipelineStage.PLANNING, | |
{"task": task} | |
) | |
development_result = await self.pipeline.execute_stage( | |
PipelineStage.DEVELOPMENT, | |
planning_result["result"] | |
) | |
testing_result = await self.pipeline.execute_stage( | |
PipelineStage.TESTING, | |
development_result["result"] | |
) | |
# Analyze results | |
quality_analysis = self._analyze_quality(testing_result["result"]) | |
# Record iteration history | |
self.history.append({ | |
"iteration": self.current_iteration, | |
"quality_metrics": quality_analysis, | |
"timestamp": datetime.now() | |
}) | |
# Check if quality requirements are met | |
if self._meets_quality_requirements(quality_analysis): | |
self.logger.info("Quality requirements met. Refinement cycle complete.") | |
return self._prepare_final_result(quality_analysis) | |
self.current_iteration += 1 | |
return { | |
"status": "max_iterations_reached", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis | |
} | |
except Exception as e: | |
self.logger.error(f"Error in refinement cycle: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _analyze_quality(self, result: Dict[str, Any]) -> Dict[str, float]: | |
"""Analyze the quality metrics of the current iteration.""" | |
return { | |
"code_quality": self.quality_metrics.code_quality_score, | |
"test_coverage": self.quality_metrics.test_coverage, | |
"security_score": float(self.quality_metrics.security_score) | |
} | |
def _meets_quality_requirements(self, quality_analysis: Dict[str, float]) -> bool: | |
"""Check if the current quality metrics meet the requirements.""" | |
thresholds = self.quality_metrics.thresholds | |
return ( | |
quality_analysis["code_quality"] >= thresholds["code_quality"] and | |
quality_analysis["test_coverage"] >= thresholds["test_coverage"] and | |
quality_analysis["security_score"] >= thresholds["security"] | |
) | |
def _prepare_final_result(self, quality_analysis: Dict[str, float]) -> Dict[str, Any]: | |
"""Prepare the final result of the refinement cycle.""" | |
return { | |
"status": "success", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis, | |
"history": self.history | |
} | |
def get_refinement_history(self) -> List[Dict[str, Any]]: | |
"""Get the history of refinement iterations.""" | |
return self.history | |
class ChatSystem: | |
"""Manages chat interaction between users and the autonomous system.""" | |
def __init__(self, agent): | |
self.agent = agent | |
self.chat_history = [] | |
self.active_tasks = {} | |
self.command_handlers = { | |
'/task': self.handle_task_command, | |
'/status': self.handle_status_command, | |
'/stop': self.handle_stop_command, | |
'/help': self.handle_help_command, | |
'/modify': self.handle_modify_command | |
} | |
self.logger = logging.getLogger(__name__) | |
def render_chat_interface(self): | |
"""Render the chat interface in Streamlit sidebar.""" | |
with st.sidebar: | |
st.markdown("---") | |
st.subheader("System Chat") | |
if st.button("Clear Chat History"): | |
self.clear_chat_history() | |
chat_container = st.container() | |
with chat_container: | |
for message in self.chat_history: | |
self._render_message(message) | |
user_input = st.text_input("Type message/command...", key="chat_input") | |
if st.button("Send", key="send_message"): | |
self.process_user_input(user_input) | |
def handle_task_command(self, input_data: Dict): | |
"""Handle task command.""" | |
self.logger.info("Handling task command") | |
task = input_data.get("task", input_data.get("input", "")) | |
asyncio.create_task(self.agent.app.refinement_loop.run_refinement_cycle(task)) | |
return "Task command initiated" | |
def handle_status_command(self, input_data: Dict): | |
"""Handle status command.""" | |
self.logger.info("Handling status command") | |
return { | |
"status": "success", | |
"history": self.agent.app.refinement_loop.get_refinement_history() | |
} | |
def handle_stop_command(self, input_data: Dict): | |
"""Handle stop command.""" | |
self.logger.info("Handling stop command") | |
# Add logic to stop current task | |
return "Stop command handled" | |
def handle_help_command(self, input_data: Dict): | |
"""Handle help command.""" | |
self.logger.info("Handling help command") | |
return """ | |
Available commands: | |
/task <task_name> - Run the autonomous agent with the given task | |
/status - Get the current status of the refinement cycle | |
/stop - Stop the current task | |
/help - Show this help message | |
/modify - Modify current task parameters | |
""" | |
def handle_modify_command(self, input_data: Dict): | |
"""Handle modify command.""" | |
self.logger.info("Handling modify command") | |
return "Modify command handled" | |
def clear_chat_history(self): | |
"""Clear the chat history.""" | |
self.logger.info("Clearing chat history") | |
self.chat_history.clear() | |
return "Chat history cleared" | |
def _render_message(self, message: str): | |
"""Render a chat message.""" | |
st.write(message) | |
def process_user_input(self, user_input: str): | |
"""Process user input.""" | |
self.logger.info("Processing user input") | |
command = user_input.strip().split()[0] | |
if command in self.command_handlers: | |
result = self.command_handlers[command]({"input": user_input}) | |
self.chat_history.append(f"User: {user_input}") | |
self.chat_history.append(f"System: {result}") | |
class StreamlitInterface: | |
"""Handles the Streamlit user interface for the Autonomous Agent System""" | |
def __init__(self, agent): | |
"""Initialize the Streamlit interface.""" | |
self.agent = agent | |
self.metrics_history = [] | |
self.activity_log = [] | |
def render_main_interface(self): | |
"""Render the main Streamlit interface.""" | |
st.title("Autonomous Agent System") | |
# Initialize session state if not exists | |
if 'current_page' not in st.session_state: | |
st.session_state.current_page = 'Dashboard' | |
# Sidebar for navigation | |
self.render_sidebar() | |
# Main content area | |
selected_page = st.session_state.current_page | |
if selected_page == 'Dashboard': | |
self.render_dashboard() | |
elif selected_page == 'Pipeline': | |
self.render_pipeline_view() | |
elif selected_page == 'Metrics': | |
self.render_metrics_view() | |
elif selected_page == 'Settings': | |
self.render_settings() | |
def render_sidebar(self): | |
"""Render the sidebar navigation.""" | |
with st.sidebar: | |
st.title("Navigation") | |
pages = ['Dashboard', 'Pipeline', 'Metrics', 'Settings'] | |
selected_page = st.radio("Select Page", pages, key='nav_radio') | |
st.session_state.current_page = selected_page | |
def render_dashboard(self): | |
"""Render the dashboard view.""" | |
st.header("System Dashboard") | |
# System Status | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Pipeline Status", "Active") | |
with col2: | |
current_stage = getattr(self.agent.pipeline, 'current_stage', None) | |
stage_name = current_stage.name if current_stage else "None" | |
st.metric("Current Stage", stage_name) | |
with col3: | |
st.metric("Tasks Completed", "0") | |
# Recent Activity | |
st.subheader("Recent Activity") | |
activity_data = self.get_recent_activity() | |
if activity_data: | |
st.table(activity_data) | |
else: | |
st.info("No recent activity") | |
def render_pipeline_view(self): | |
"""Render the pipeline view.""" | |
st.header("Development Pipeline") | |
# Pipeline Stages | |
stages = { | |
'PLANNING': "Planning", | |
'DEVELOPMENT': "Development", | |
'TESTING': "Testing" | |
} | |
for stage_key, stage_name in stages.items(): | |
with st.expander(f"{stage_name} Stage"): | |
current_stage = getattr(self.agent.pipeline, 'current_stage', None) | |
if current_stage and stage_key == current_stage.name: | |
st.info("Current Stage") | |
self.render_stage_details(stage_key) | |
def render_metrics_view(self): | |
"""Render the metrics view.""" | |
st.header("System Metrics") | |
# Code Quality Metrics | |
st.subheader("Code Quality Metrics") | |
metrics = self.get_metrics() | |
if metrics: | |
for file, file_metrics in metrics.items(): | |
with st.expander(f"Metrics for {file}"): | |
st.json(file_metrics) | |
else: | |
st.info("No metrics available yet") | |
def render_settings(self): | |
"""Render the settings view.""" | |
st.header("System Settings") | |
# General Settings | |
st.subheader("General Settings") | |
workspace_dir = st.text_input( | |
"Workspace Directory", | |
value=self.agent.workspace_manager.workspace_dir | |
) | |
# Pipeline Settings | |
st.subheader("Pipeline Settings") | |
quality_threshold = st.slider( | |
"Quality Threshold", | |
min_value=0.0, | |
max_value=1.0, | |
value=0.8, | |
step=0.1 | |
) | |
if st.button("Save Settings"): | |
self.save_settings(workspace_dir, quality_threshold) | |
def get_recent_activity(self) -> List[Dict]: | |
"""Get recent activity data for display.""" | |
return [ | |
{ | |
"timestamp": activity["timestamp"].strftime("%Y-%m-%d %H:%M:%S"), | |
"action": activity["action"], | |
"status": activity["status"], | |
"details": activity["details"] | |
} | |
for activity in self.activity_log[-10:] # Show last 10 activities | |
] | |
def get_metrics(self) -> Dict: | |
"""Get current metrics data.""" | |
return getattr(self.agent.pipeline, 'metrics', {}) | |
def render_stage_details(self, stage: str): | |
"""Render details for a specific pipeline stage.""" | |
stage_results = getattr(self.agent.pipeline, f'{stage.lower()}_results', {}) | |
if not stage_results: | |
st.write("No data available for this stage yet.") | |
return | |
if stage == 'PLANNING': | |
self._render_planning_details(stage_results) | |
elif stage == 'DEVELOPMENT': | |
self._render_development_details(stage_results) | |
elif stage == 'TESTING': | |
self._render_testing_details(stage_results) | |
def _render_planning_details(self, results: Dict): | |
"""Render planning stage details.""" | |
st.subheader("Tasks") | |
tasks = results.get('tasks', []) | |
for task in tasks: | |
st.write(f"- {task}") | |
def _render_development_details(self, results: Dict): | |
"""Render development stage details.""" | |
st.subheader("Development Progress") | |
progress = results.get('progress', 0) | |
st.progress(progress) | |
files = results.get('files', []) | |
if files: | |
st.subheader("Modified Files") | |
for file in files: | |
st.write(f"- {file}") | |
def _render_testing_details(self, results: Dict): | |
"""Render testing stage details.""" | |
st.subheader("Test Results") | |
tests = results.get('tests', {}) | |
if tests: | |
passed = tests.get('passed', 0) | |
failed = tests.get('failed', 0) | |
st.write(f"Passed: {passed}") | |
st.write(f"Failed: {failed}") | |
else: | |
st.write("No test results available") | |
def save_settings(self, workspace_dir: str, quality_threshold: float): | |
"""Save the system settings.""" | |
try: | |
self.agent.workspace_manager.workspace_dir = workspace_dir | |
# Add more settings as needed | |
self.log_activity({ | |
"timestamp": datetime.now(), | |
"action": "Settings Update", | |
"status": "Success", | |
"details": "Settings updated successfully" | |
}) | |
st.success("Settings saved successfully!") | |
except Exception as e: | |
st.error(f"Error saving settings: {str(e)}") | |
def log_activity(self, activity: Dict): | |
"""Log an activity in the system.""" | |
self.activity_log.append(activity) | |
if len(self.activity_log) > 100: # Keep only last 100 activities | |
self.activity_log = self.activity_log[-100:] | |
def main(): | |
autonomous_agent_app = AutonomousAgentApp() | |
autonomous_agent_app.run() | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as e: | |
st.error(f"Application Error: {str(e)}") | |
logging.error(f"Application Error: {str(e)}", exc_info=True) |