CSVBot-DeepSeek / app.py
hmrizal's picture
first-deepseek-coder-1.3b-instruct
afb99d9 verified
import gradio as gr
import os
import uuid
import threading
import pandas as pd
import torch
from langchain.document_loaders import CSVLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import HuggingFacePipeline
from langchain.chains import LLMChain
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain.prompts import PromptTemplate
# Global model cache
MODEL_CACHE = {
"model": None,
"tokenizer": None,
"init_lock": threading.Lock()
}
# Create directories for user data
os.makedirs("user_data", exist_ok=True)
def initialize_model_once():
"""Initialize the model once and cache it"""
with MODEL_CACHE["init_lock"]:
if MODEL_CACHE["model"] is None:
# Use a smaller model for CPU environment
model_name = "deepseek-ai/deepseek-coder-1.3b-instruct"
# Load tokenizer and model with CPU-friendly configuration
MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained(model_name)
MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float32, # Use float32 for CPU
device_map="auto",
low_cpu_mem_usage=True, # Optimize for low memory
trust_remote_code=True
)
return MODEL_CACHE["tokenizer"], MODEL_CACHE["model"]
def create_llm_pipeline():
"""Create a new pipeline using the cached model"""
tokenizer, model = initialize_model_once()
# Create a CPU-friendly pipeline
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=256, # Reduced for faster responses
temperature=0.3,
top_p=0.9,
top_k=30,
repetition_penalty=1.2,
return_full_text=False,
)
# Wrap pipeline in HuggingFacePipeline for LangChain compatibility
return HuggingFacePipeline(pipeline=pipe)
def create_conversational_chain(db, file_path):
llm = create_llm_pipeline()
# Load the file into pandas to enable code execution for data analysis
df = pd.read_csv(file_path)
# Create improved prompt template that focuses on direct answers, not code
template = """
Berikut ini adalah informasi tentang file CSV:
Kolom-kolom dalam file: {columns}
Beberapa baris pertama:
{sample_data}
Konteks tambahan dari vector database:
{context}
Pertanyaan: {question}
INSTRUKSI PENTING:
1. Jangan tampilkan kode Python, berikan jawaban langsung dalam Bahasa Indonesia.
2. Jika pertanyaan terkait statistik data (rata-rata, maksimum dll), lakukan perhitungan dan berikan hasilnya.
3. Jawaban harus singkat, jelas dan akurat berdasarkan data yang ada.
4. Gunakan format yang sesuai untuk angka (desimal 2 digit untuk nilai non-integer).
5. Jangan menyebutkan proses perhitungan, fokus pada hasil akhir.
Jawaban:
"""
PROMPT = PromptTemplate(
template=template,
input_variables=["columns", "sample_data", "context", "question"]
)
# Create retriever
retriever = db.as_retriever(search_kwargs={"k": 3}) # Reduced k for better performance
# Process query with better error handling
def process_query(query, chat_history):
try:
# Get information from dataframe for context
columns_str = ", ".join(df.columns.tolist())
sample_data = df.head(2).to_string() # Reduced to 2 rows for performance
# Get context from vector database
docs = retriever.get_relevant_documents(query)
context = "\n\n".join([doc.page_content for doc in docs])
# Dynamically calculate answers for common statistical queries
def preprocess_query():
query_lower = query.lower()
result = None
# Handle statistical queries directly
if "rata-rata" in query_lower or "mean" in query_lower or "average" in query_lower:
for col in df.columns:
if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]):
try:
result = f"Rata-rata {col} adalah {df[col].mean():.2f}"
except:
pass
elif "maksimum" in query_lower or "max" in query_lower or "tertinggi" in query_lower:
for col in df.columns:
if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]):
try:
result = f"Nilai maksimum {col} adalah {df[col].max():.2f}"
except:
pass
elif "minimum" in query_lower or "min" in query_lower or "terendah" in query_lower:
for col in df.columns:
if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]):
try:
result = f"Nilai minimum {col} adalah {df[col].min():.2f}"
except:
pass
elif "total" in query_lower or "jumlah" in query_lower or "sum" in query_lower:
for col in df.columns:
if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]):
try:
result = f"Total {col} adalah {df[col].sum():.2f}"
except:
pass
elif "baris" in query_lower or "jumlah data" in query_lower or "row" in query_lower:
result = f"Jumlah baris data adalah {len(df)}"
elif "kolom" in query_lower or "field" in query_lower:
if "nama" in query_lower or "list" in query_lower or "sebutkan" in query_lower:
result = f"Kolom dalam data: {', '.join(df.columns.tolist())}"
return result
# Try direct calculation first
direct_answer = preprocess_query()
if direct_answer:
return {"answer": direct_answer}
# If no direct calculation, use the LLM
chain = LLMChain(llm=llm, prompt=PROMPT)
raw_result = chain.run(
columns=columns_str,
sample_data=sample_data,
context=context,
question=query
)
# Clean the result
cleaned_result = raw_result.strip()
# If result is empty after cleaning, use a fallback
if not cleaned_result:
return {"answer": "Tidak dapat memproses jawaban. Silakan coba pertanyaan lain."}
return {"answer": cleaned_result}
except Exception as e:
import traceback
print(f"Error in process_query: {str(e)}")
print(traceback.format_exc())
return {"answer": f"Terjadi kesalahan saat memproses pertanyaan: {str(e)}"}
return process_query
class ChatBot:
def __init__(self, session_id):
self.session_id = session_id
self.chat_history = []
self.chain = None
self.user_dir = f"user_data/{session_id}"
self.csv_file_path = None
os.makedirs(self.user_dir, exist_ok=True)
def process_file(self, file):
if file is None:
return "Mohon upload file CSV terlebih dahulu."
try:
# Handle file from Gradio
file_path = file.name if hasattr(file, 'name') else str(file)
self.csv_file_path = file_path
# Copy to user directory
user_file_path = f"{self.user_dir}/uploaded.csv"
# Verify the CSV can be loaded
try:
df = pd.read_csv(file_path)
print(f"CSV verified: {df.shape[0]} rows, {len(df.columns)} columns")
# Save a copy in user directory
df.to_csv(user_file_path, index=False)
self.csv_file_path = user_file_path
except Exception as e:
return f"Error membaca CSV: {str(e)}"
# Load document with reduced chunk size for better memory usage
try:
loader = CSVLoader(file_path=file_path, encoding="utf-8", csv_args={
'delimiter': ','})
data = loader.load()
print(f"Documents loaded: {len(data)}")
except Exception as e:
return f"Error loading documents: {str(e)}"
# Create vector database with optimized settings
try:
db_path = f"{self.user_dir}/db_faiss"
# Use CPU-friendly embeddings with smaller dimensions
embeddings = HuggingFaceEmbeddings(
model_name='sentence-transformers/all-MiniLM-L6-v2',
model_kwargs={'device': 'cpu'}
)
db = FAISS.from_documents(data, embeddings)
db.save_local(db_path)
print(f"Vector database created at {db_path}")
except Exception as e:
return f"Error creating vector database: {str(e)}"
# Create custom chain
try:
self.chain = create_conversational_chain(db, self.csv_file_path)
print("Chain created successfully")
except Exception as e:
return f"Error creating chain: {str(e)}"
# Add basic file info to chat history for context
file_info = f"CSV berhasil dimuat dengan {df.shape[0]} baris dan {len(df.columns)} kolom. Kolom: {', '.join(df.columns.tolist())}"
self.chat_history.append(("System", file_info))
return "File CSV berhasil diproses! Anda dapat mulai chat dengan model untuk analisis data."
except Exception as e:
import traceback
print(traceback.format_exc())
return f"Error pemrosesan file: {str(e)}"
def chat(self, message, history):
if self.chain is None:
return "Mohon upload file CSV terlebih dahulu."
try:
# Process the question with the chain
result = self.chain(message, self.chat_history)
# Get the answer with fallback
answer = result.get("answer", "Maaf, tidak dapat menghasilkan jawaban. Silakan coba pertanyaan lain.")
# Ensure we never return empty
if not answer or answer.strip() == "":
answer = "Maaf, tidak dapat menghasilkan jawaban yang sesuai. Silakan coba pertanyaan lain."
# Update internal chat history
self.chat_history.append((message, answer))
# Return just the answer for Gradio
return answer
except Exception as e:
import traceback
print(traceback.format_exc())
return f"Error: {str(e)}"
# UI Code
def create_gradio_interface():
with gr.Blocks(title="Chat with CSV using DeepSeek") as interface:
session_id = gr.State(lambda: str(uuid.uuid4()))
chatbot_state = gr.State(lambda: None)
gr.HTML("<h1 style='text-align: center;'>Chat with CSV using DeepSeek</h1>")
gr.HTML("<h3 style='text-align: center;'>Asisten analisis CSV untuk berbagai kebutuhan</h3>")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload CSV Anda",
file_types=[".csv"]
)
process_button = gr.Button("Proses CSV")
with gr.Accordion("Informasi Model", open=False):
gr.Markdown("""
**Fitur**:
- Tanya jawab berbasis data
- Analisis statistik otomatis
- Support berbagai format CSV
- Manajemen sesi per pengguna
""")
with gr.Column(scale=2):
chatbot_interface = gr.Chatbot(
label="Riwayat Chat",
height=400
)
message_input = gr.Textbox(
label="Ketik pesan Anda",
placeholder="Tanyakan tentang data CSV Anda...",
lines=2
)
submit_button = gr.Button("Kirim")
clear_button = gr.Button("Bersihkan Chat")
# Process file handler
def handle_process_file(file, sess_id):
chatbot = ChatBot(sess_id)
result = chatbot.process_file(file)
return chatbot, [(None, result)]
process_button.click(
fn=handle_process_file,
inputs=[file_input, session_id],
outputs=[chatbot_state, chatbot_interface]
)
# Chat handlers
def user_message_submitted(message, history, chatbot, sess_id):
history = history + [(message, None)]
return history, "", chatbot, sess_id
def bot_response(history, chatbot, sess_id):
if chatbot is None:
chatbot = ChatBot(sess_id)
history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.")
return chatbot, history
user_message = history[-1][0]
response = chatbot.chat(user_message, history[:-1])
history[-1] = (user_message, response)
return chatbot, history
submit_button.click(
fn=user_message_submitted,
inputs=[message_input, chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_interface, message_input, chatbot_state, session_id]
).then(
fn=bot_response,
inputs=[chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_state, chatbot_interface]
)
message_input.submit(
fn=user_message_submitted,
inputs=[message_input, chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_interface, message_input, chatbot_state, session_id]
).then(
fn=bot_response,
inputs=[chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_state, chatbot_interface]
)
# Clear chat handler
def handle_clear_chat(chatbot):
if chatbot is not None:
chatbot.chat_history = []
return chatbot, []
clear_button.click(
fn=handle_clear_chat,
inputs=[chatbot_state],
outputs=[chatbot_state, chatbot_interface]
)
return interface
# Launch the interface
demo = create_gradio_interface()
demo.launch(share=True)