AI-server / app.py
chitkenkhoi's picture
first
7936e19
raw
history blame
9.19 kB
import os
import torch
import numpy as np
import pandas as pd
from sentence_transformers import util, SentenceTransformer
import redis
import json
from typing import Dict, List
import google.generativeai as genai
from flask import Flask, request, jsonify, Response
import requests
from io import StringIO
# Initialize Flask app
app = Flask(__name__)
# Redis configuration
r = redis.Redis(
host='redis-12878.c1.ap-southeast-1-1.ec2.redns.redis-cloud.com',
port=12878,
db=0,
password="qKl6znBvULaveJhkjIjMr7RCwluJjjbH",
decode_responses=True
)
# Device configuration
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load CSV from Google Drive
def load_csv_from_drive():
file_id = "1vU23pGS-kkpkUFNDl8BmuBUc2Am0966p"
url = f"https://drive.google.com/uc?id={file_id}"
response = requests.get(url)
csv_content = StringIO(response.text)
df = pd.read_csv(csv_content)[['text', 'embeddings']]
# Process embeddings
df["embeddings"] = df["embeddings"].apply(
lambda x: np.fromstring(x.strip("[]"), sep=",", dtype=np.float32)
)
return df
# Load data and initialize models
text_chunks_and_embedding_df = load_csv_from_drive()
pages_and_chunks = text_chunks_and_embedding_df.to_dict(orient="records")
embeddings = torch.tensor(
np.vstack(text_chunks_and_embedding_df["embeddings"].values),
dtype=torch.float32
).to(device)
# Initialize embedding model
embedding_model = SentenceTransformer(
model_name_or_path="keepitreal/vietnamese-sbert",
device=device
)
def store_conversation(conversation_id: str, q: str, a: str) -> None:
conversation_element = {
'q': q,
'a': a,
}
conversation_json = json.dumps(conversation_element)
r.lpush(f'conversation_{conversation_id}', conversation_json)
current_length = r.llen(f'conversation_{conversation_id}')
if current_length > 2:
r.rpop(f'conversation_{conversation_id}')
def retrieve_conversation(conversation_id):
conversation = r.lrange(f'conversation_{conversation_id}', 0, -1)
return [json.loads(c) for c in conversation]
def combine_vectors_method2(vector_weight_pairs):
weight_norm = np.sqrt(sum(weight**2 for _, weight in vector_weight_pairs))
combined_vector = np.zeros_like(vector_weight_pairs[0][0])
for vector, weight in vector_weight_pairs:
normalized_weight = weight / weight_norm
combined_vector += vector * normalized_weight
return combined_vector
def get_weighted_query(current_question: str, parsed_conversation: List[Dict]) -> np.ndarray:
current_vector = embedding_model.encode(current_question)
weighted_parts = [(current_vector, 1.0)]
if parsed_conversation:
context_string = " ".join(
f"{chat['q']} {chat['a']}" for chat in parsed_conversation
)
context_vector = embedding_model.encode(context_string)
similarity = util.pytorch_cos_sim(current_vector, context_vector)[0][0].item()
weight = 1.0 if similarity > 0.4 else 0.5
weighted_parts.append((context_vector, weight))
weighted_query_vector = combine_vectors_method2(weighted_parts)
weighted_query_vector = torch.from_numpy(weighted_query_vector).to(torch.float32)
# Normalize vector
norm = torch.norm(weighted_query_vector)
weighted_query_vector = weighted_query_vector / norm if norm > 0 else weighted_query_vector
return weighted_query_vector.numpy()
def retrieve_relevant_resources(query_vector, embeddings, similarity_threshold=0.5, n_resources_to_return=10):
query_embedding = torch.from_numpy(query_vector).to(torch.float32)
if len(query_embedding.shape) == 1:
query_embedding = query_embedding.unsqueeze(0)
query_embedding = query_embedding.cuda()
if embeddings.shape[1] != query_embedding.shape[1]:
query_embedding = torch.nn.functional.pad(
query_embedding,
(0, embeddings.shape[1] - query_embedding.shape[1])
)
query_embedding = torch.nn.functional.normalize(query_embedding, p=2, dim=1)
embeddings_normalized = torch.nn.functional.normalize(embeddings, p=2, dim=1)
cosine_scores = torch.matmul(query_embedding, embeddings_normalized.t())[0]
mask = cosine_scores >= similarity_threshold
filtered_scores = cosine_scores[mask]
filtered_indices = mask.nonzero().squeeze()
if len(filtered_scores) == 0:
return torch.tensor([]), torch.tensor([])
k = min(n_resources_to_return, len(filtered_scores))
scores, indices = torch.topk(filtered_scores, k=k)
final_indices = filtered_indices[indices]
return scores, final_indices
def prompt_formatter(query: str, context_items: List[Dict], history: List[Dict] = None, isFirst = False) -> str:
context = "- " + "\n- ".join([item["text"] for item in context_items])
history_str = ""
if history:
history_str = "\nLịch sử hội thoại:\n"
for qa in history:
history_str += f"Câu hỏi: {qa['q']}\n"
history_str += f"Trả lời: {qa['a']}\n"
if isFirst:
example = """
Đồng thời hãy thêm vào một dòng vào cuối câu trả lời của bạn, dòng đó sẽ là dòng nói về chủ đề mà người dùng đang hỏi.
Chủ đề nên càng ngắn gọn càng tốt (tối đa 7 từ).
Ví dụ:
Câu hỏi: "Trường đại học bách khoa thành lập vào năm nào?"
Ngữ cảnh có đề cập về trường đại học bách khoa thành lập vào năm 1957.
Trả lời: "Trường đại học bách khoa thành lập vào năm 1957. \n Chủ đề-123: Trường đại học Bách khoa"
"""
else:
example = """
Ví dụ:
Câu hỏi: "Trường đại học bách khoa thành lập vào năm nào?"
Ngữ cảnh có đề cập về trường đại học bách khoa thành lập vào năm 1957.
Trả lời: "Trường đại học bách khoa thành lập vào năm 1957."
"""
base_prompt = """Dựa trên các thông tin ngữ cảnh sau đây, hãy trả lời câu hỏi.
Hãy trích xuất các đoạn văn bản liên quan từ ngữ cảnh trước khi trả lời.
Chỉ trả lời câu hỏi, không cần giải thích quá trình suy luận.
Đảm bảo câu trả lời càng chi tiết và giải thích càng tốt.
Hãy trả lời đầy đủ, không được cắt ngắn câu trả lời.
Nếu câu trả lời quá dài, hãy chia thành các phần nhỏ và trả lời từng phần.
Nếu không có ngữ cảnh hoặc ngữ cảnh không cung cấp thông tin cần thiết hãy trả lời là "Mình không có dữ liệu về câu hỏi này" và không thêm bất cứ thứ gì.
Không được nhắc về từ "ngữ cảnh" trong câu trả lời. Tôi muốn câu trả lời của mình có đầy đủ chủ ngữ vị ngữ.
{example}
Ngữ cảnh:
{context}
Lịch sử cuộc hội thoại hiện tại:
{history}
Câu hỏi: {query}
Trả lời:"""
return base_prompt.format(context=context, history=history_str, query=query, example=example)
def ask_with_history_v3(query: str, conversation_id: str, isFirst):
parsed_conversation = retrieve_conversation(conversation_id)
weighted_query_vector = get_weighted_query(query, parsed_conversation)
threshold = 0.4
scores, indices = retrieve_relevant_resources(
query_vector=weighted_query_vector,
similarity_threshold=threshold,
embeddings=embeddings
)
scores_cpu = [score.cpu() for score in scores]
filtered_pairs = [(score, idx) for score, idx in zip(scores_cpu, indices) if score.item() >= threshold]
if filtered_pairs:
filtered_scores, filtered_indices = zip(*filtered_pairs)
context_items = [pages_and_chunks[i] for i in filtered_indices]
for i, item in enumerate(context_items):
item["score"] = filtered_scores[i]
else:
context_items = []
prompt = prompt_formatter(query=query, context_items=context_items, history=parsed_conversation, isFirst=isFirst)
genai.configure(api_key="AIzaSyDluIEKEhT1Dw2zx7SHEdmKipwBcYOmFQw")
model = genai.GenerativeModel("gemini-1.5-flash")
response = model.generate_content(prompt, stream=True)
for chunk in response:
yield chunk.text
store_conversation(conversation_id, query, response.text)
# API endpoints
@app.route('/ping', methods=['GET'])
def ping():
return jsonify("Service is running")
@app.route('/generate', methods=['POST'])
def generate_response():
query = request.json['query']
conversation_id = request.json['conversation_id']
isFirst = request.json['is_first']
def generate():
for token in ask_with_history_v3(query, conversation_id, isFirst):
yield token
return Response(generate(), mimetype='text/plain')
if __name__ == '__main__':
# Initialize data and models
app.run(host="0.0.0.0", port=7860)