import gradio as gr import os import uuid import threading import pandas as pd import numpy as np from transformers import AutoModelForCausalLM, AutoTokenizer import torch # Global model cache MODEL_CACHE = { "model": None, "tokenizer": None, "init_lock": threading.Lock() } # Create directories for user data os.makedirs("user_data", exist_ok=True) def initialize_model_once(): """Initialize Phi-4-mini model once""" with MODEL_CACHE["init_lock"]: if MODEL_CACHE["model"] is None: # Load Phi-4-mini model MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained("microsoft/Phi-4-mini-instruct") MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained( "microsoft/Phi-4-mini-instruct", torch_dtype=torch.float16, device_map="auto" ) return MODEL_CACHE["model"], MODEL_CACHE["tokenizer"] def generate_pandas_code(prompt, max_new_tokens=512): """Generate Python code using the Phi-4-mini model""" model, tokenizer = initialize_model_once() inputs = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.2, top_p=0.9, ) response = tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract only the generated part, removing the input prompt generated_text = response[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):] # Extract code between ```python and ``` if present import re code_match = re.search(r'```python\s*(.*?)\s*```', generated_text, re.DOTALL) if code_match: return code_match.group(1).strip() else: # Return the raw generated text as fallback return generated_text.strip() class ChatBot: def __init__(self, session_id): self.session_id = session_id self.csv_info = None self.df = None self.chat_history = [] self.user_dir = f"user_data/{session_id}" os.makedirs(self.user_dir, exist_ok=True) def process_file(self, file): if file is None: return "Mohon upload file CSV terlebih dahulu." try: # Handle file from Gradio file_path = file.name if hasattr(file, 'name') else str(file) file_name = os.path.basename(file_path) # Load and save CSV directly with pandas try: self.df = pd.read_csv(file_path) user_file_path = f"{self.user_dir}/uploaded.csv" self.df.to_csv(user_file_path, index=False) # Store CSV info self.csv_info = { "filename": file_name, "rows": self.df.shape[0], "columns": self.df.shape[1], "column_names": self.df.columns.tolist(), } print(f"CSV verified: {self.df.shape[0]} rows, {len(self.df.columns)} columns") except Exception as e: return f"Error membaca CSV: {str(e)}" # Add file info to chat history file_info = f"CSV berhasil dimuat: {file_name} dengan {self.df.shape[0]} baris dan {len(self.df.columns)} kolom. Kolom: {', '.join(self.df.columns.tolist())}" self.chat_history.append(("System", file_info)) return f"File CSV '{file_name}' berhasil diproses! Anda dapat mulai mengajukan pertanyaan tentang data." except Exception as e: import traceback print(traceback.format_exc()) return f"Error pemrosesan file: {str(e)}" def execute_query(self, code): """Safely execute pandas code""" try: # Create local context with the dataframe local_vars = {"df": self.df, "pd": pd, "np": np} # Execute code with timeout exec(code, {"pd": pd, "np": np}, local_vars) # Get result if "result" in local_vars: return local_vars["result"] else: # If no result variable, find the last variable created last_var = None for var_name, var_value in local_vars.items(): if var_name not in ["df", "pd", "np"] and var_name != "__builtins__": last_var = var_value if last_var is not None: return last_var else: return self.df # Return the dataframe as default except Exception as e: raise Exception(f"Gagal menjalankan kode: {str(e)}") def chat(self, message, history): if self.df is None: return "Mohon upload file CSV terlebih dahulu." try: # Handle common metadata questions directly to save resources message_lower = message.lower() if "nama file" in message_lower: return f"Nama file CSV adalah: {self.csv_info['filename']}" elif "nama kolom" in message_lower: return f"Kolom dalam CSV: {', '.join(self.csv_info['column_names'])}" elif "jumlah baris" in message_lower or "berapa baris" in message_lower: return f"Jumlah baris dalam CSV: {self.csv_info['rows']}" elif "jumlah kolom" in message_lower or "berapa kolom" in message_lower: return f"Jumlah kolom dalam CSV: {self.csv_info['columns']}" # Get sample data for context sample_df = self.df.head(5) sample_str = sample_df.to_string() data_types = {col: str(dtype) for col, dtype in self.df.dtypes.items()} # Create prompt for LLM prompt = f""" You are a data analyst that translates natural language questions into Python pandas code. DataFrame information: - Column names: {', '.join(self.csv_info['column_names'])} - Data types: {data_types} - Number of rows: {self.csv_info['rows']} - Sample data: {sample_str} User question: {message} Write a short Python code using pandas to answer the user's question. The code must use the 'df' variable as the DataFrame name. The code should assign the final result to a variable named 'result'. Only return the Python code without any explanation. ```python """ # Generate code with Phi-4 try: code = generate_pandas_code(prompt) # Add result variable if not present if not any(line.strip().startswith("result =") for line in code.split("\n")): if code.startswith("df."): code = "result = " + code elif not "result" in code: code = "result = " + code except Exception as e: print(f"Error generating code: {str(e)}") # Fallback for basic questions if "rata-rata" in message_lower or "mean" in message_lower: code = "result = df.describe()" elif "jumlah" in message_lower or "count" in message_lower: code = "result = df.count()" else: return f"Maaf, saya tidak dapat menghasilkan kode untuk pertanyaan ini. Error: {str(e)}" # Execute the code and get result try: print(f"Executing code: {code}") result = self.execute_query(code) # Check if result is relevant to the question if result is None or (isinstance(result, pd.DataFrame) and result.empty): return "Maaf, kita tidak bisa mendapatkan informasi terkait pertanyaan anda di dalam file CSV anda." # Format result based on its type if isinstance(result, pd.DataFrame): if len(result) > 5: result_str = result.head(5).to_string() + f"\n\n[Total {len(result)} baris]" else: result_str = result.to_string() elif isinstance(result, (pd.Series, np.ndarray)): if len(result) > 10: result_str = str(result[:10]) + f"\n\n[Total {len(result)} item]" else: result_str = str(result) elif hasattr(result, "__len__") and not isinstance(result, (str, int, float)): result_str = str(result) if len(result) > 0: result_str += f"\n\n[Total {len(result)} item]" else: result_str = str(result) # Format response response = f"Hasil analisis:\n\n{result_str}\n\nKode yang dijalankan:\n```python\n{code}\n```" self.chat_history.append((message, response)) return response except Exception as e: return f"Error saat menganalisis data: {str(e)}\n\nKode yang dicoba:\n```python\n{code}\n```" except Exception as e: import traceback print(traceback.format_exc()) return f"Error: {str(e)}" # UI Code (sama seperti sebelumnya) def create_gradio_interface(): with gr.Blocks(title="CSV Data Analyzer") as interface: session_id = gr.State(lambda: str(uuid.uuid4())) chatbot_state = gr.State(lambda: None) gr.HTML("

CSV Data Analyzer

") gr.HTML("

Ajukan pertanyaan tentang data CSV Anda

") with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload CSV Anda", file_types=[".csv"] ) process_button = gr.Button("Proses CSV") with gr.Accordion("Contoh Pertanyaan", open=False): gr.Markdown(""" - "Berapa jumlah data yang memiliki nilai Glucose di atas 150?" - "Hitung nilai rata-rata setiap kolom numerik" - "Berapa banyak data untuk setiap kelompok dalam kolom Outcome?" - "Berapa jumlah baris dalam dataset ini?" - "Berapa jumlah kolom dalam dataset ini?" """) with gr.Column(scale=2): chatbot_interface = gr.Chatbot( label="Riwayat Chat", height=400 ) message_input = gr.Textbox( label="Ketik pertanyaan Anda", placeholder="Contoh: Berapa jumlah data yang memiliki nilai Glucose di atas 150?", lines=2 ) submit_button = gr.Button("Kirim") clear_button = gr.Button("Bersihkan Chat") # Handler functions def handle_process_file(file, sess_id): chatbot = ChatBot(sess_id) result = chatbot.process_file(file) return chatbot, [(None, result)] process_button.click( fn=handle_process_file, inputs=[file_input, session_id], outputs=[chatbot_state, chatbot_interface] ) def user_message_submitted(message, history, chatbot, sess_id): history = history + [(message, None)] return history, "", chatbot, sess_id def bot_response(history, chatbot, sess_id): if chatbot is None: chatbot = ChatBot(sess_id) history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.") return chatbot, history user_message = history[-1][0] response = chatbot.chat(user_message, history[:-1]) history[-1] = (user_message, response) return chatbot, history submit_button.click( fn=user_message_submitted, inputs=[message_input, chatbot_interface, chatbot_state, session_id], outputs=[chatbot_interface, message_input, chatbot_state, session_id] ).then( fn=bot_response, inputs=[chatbot_interface, chatbot_state, session_id], outputs=[chatbot_state, chatbot_interface] ) message_input.submit( fn=user_message_submitted, inputs=[message_input, chatbot_interface, chatbot_state, session_id], outputs=[chatbot_interface, message_input, chatbot_state, session_id] ).then( fn=bot_response, inputs=[chatbot_interface, chatbot_state, session_id], outputs=[chatbot_state, chatbot_interface] ) def handle_clear_chat(chatbot): if chatbot is not None: chatbot.chat_history = [] return chatbot, [] clear_button.click( fn=handle_clear_chat, inputs=[chatbot_state], outputs=[chatbot_state, chatbot_interface] ) return interface # Launch the interface if __name__ == "__main__": demo = create_gradio_interface() demo.launch(share=True)