Redmind's picture
Update app.py
35aaa48 verified
raw
history blame
10.4 kB
from fastapi import FastAPI
import os
import pymupdf # PyMuPDF
from pptx import Presentation
from sentence_transformers import SentenceTransformer
import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import chromadb
import numpy as np
from sklearn.decomposition import PCA
app = FastAPI()
# Initialize ChromaDB
client = chromadb.PersistentClient(path="/data/chroma_db")
collection = client.get_or_create_collection(name="knowledge_base")
# File Paths
pdf_file = "Sutures and Suturing techniques.pdf"
pptx_file = "impalnt 1.pptx"
# Initialize Embedding Models
text_model = SentenceTransformer('all-MiniLM-L6-v2')
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# Image Storage Folder
IMAGE_FOLDER = "/data/extracted_images"
os.makedirs(IMAGE_FOLDER, exist_ok=True)
# Extract Text from PDF
def extract_text_from_pdf(pdf_path):
try:
doc = pymupdf.open(pdf_path)
text = " ".join(page.get_text() for page in doc)
return text.strip() if text else None
except Exception as e:
print(f"Error extracting text from PDF: {e}")
return None
# Extract Text from PPTX
def extract_text_from_pptx(pptx_path):
try:
prs = Presentation(pptx_path)
text = " ".join(
shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")
)
return text.strip() if text else None
except Exception as e:
print(f"Error extracting text from PPTX: {e}")
return None
# Extract Images from PDF
def extract_images_from_pdf(pdf_path):
try:
doc = pymupdf.open(pdf_path)
images = []
for i, page in enumerate(doc):
for img_index, img in enumerate(page.get_images(full=True)):
xref = img[0]
image = doc.extract_image(xref)
img_path = f"{IMAGE_FOLDER}/pdf_image_{i}_{img_index}.{image['ext']}"
with open(img_path, "wb") as f:
f.write(image["image"])
images.append(img_path)
return images
except Exception as e:
print(f"Error extracting images from PDF: {e}")
return []
# Extract Images from PPTX
def extract_images_from_pptx(pptx_path):
try:
images = []
prs = Presentation(pptx_path)
for i, slide in enumerate(prs.slides):
for shape in slide.shapes:
if shape.shape_type == 13:
img_path = f"{IMAGE_FOLDER}/pptx_image_{i}.{shape.image.ext}"
with open(img_path, "wb") as f:
f.write(shape.image.blob)
images.append(img_path)
return images
except Exception as e:
print(f"Error extracting images from PPTX: {e}")
return []
# Convert Text to Embeddings
def get_text_embedding(text):
return text_model.encode(text).tolist()
# Preload PCA instance globally (to maintain consistency across calls)
pca = PCA(n_components=384)
def get_image_embedding(image_path):
try:
# Load the image
image = Image.open(image_path)
inputs = processor(images=image, return_tensors="pt")
# Extract image embeddings
with torch.no_grad():
image_embedding = model.get_image_features(**inputs).numpy().flatten()
# Print the actual embedding dimension
print(f"Image embedding shape: {image_embedding.shape}")
""" # CASE 1: Embedding is already 384-dimensional ✅
if len(image_embedding) == 384:
return image_embedding.tolist()
# CASE 2: Embedding is larger than 384 (e.g., 512) → Apply PCA ✅
elif len(image_embedding) > 384:
pca = PCA(n_components=384, svd_solver='auto') # Auto solver for stability
image_embedding = pca.fit_transform(image_embedding.reshape(1, -1)).flatten()
print(f"Reduced image embedding shape: {image_embedding.shape}")
# CASE 3: Embedding is smaller than 384 → Apply Padding ❌
else:
padding = np.zeros(384 - len(image_embedding)) # Create padding vector
image_embedding = np.concatenate((image_embedding, padding)) # Append padding"""
# Truncate to 384 dimensions
image_embedding = image_embedding[:384]
# Print the final embedding shape
print(f"Final Image embedding shape: {image_embedding.shape}")
return image_embedding.tolist()
except Exception as e:
print(f"❌ Error generating image embedding: {e}")
return None
# Store Data in ChromaDB
def store_data(texts, image_paths):
for i, text in enumerate(texts):
if text:
text_embedding = get_text_embedding(text)
if len(text_embedding) == 384:
collection.add(ids=[f"text_{i}"], embeddings=[text_embedding], documents=[text])
all_embeddings = [get_image_embedding(img_path) for img_path in image_paths if get_image_embedding(img_path) is not None]
if all_embeddings:
all_embeddings = np.array(all_embeddings)
# Apply PCA only if necessary
if all_embeddings.shape[1] != 384:
pca = PCA(n_components=384)
all_embeddings = pca.fit_transform(all_embeddings)
for j, img_path in enumerate(image_paths):
collection.add(ids=[f"image_{j}"], embeddings=[all_embeddings[j].tolist()], documents=[img_path])
print("Data stored successfully!")
# Process and Store from Files
def process_and_store(pdf_path=None, pptx_path=None):
texts, images = [], []
if pdf_path:
pdf_text = extract_text_from_pdf(pdf_path)
if pdf_text:
texts.append(pdf_text)
images.extend(extract_images_from_pdf(pdf_path))
if pptx_path:
pptx_text = extract_text_from_pptx(pptx_path)
if pptx_text:
texts.append(pptx_text)
images.extend(extract_images_from_pptx(pptx_path))
store_data(texts, images)
# FastAPI Endpoints
@app.get("/")
def greet_json():
# Run Data Processing
process_and_store(pdf_path=pdf_file, pptx_path=pptx_file)
return {"Document store": "created!"}
@app.get("/retrieval")
def retrieval(query: str):
try:
query_embedding = get_text_embedding(query)
results = collection.query(query_embeddings=[query_embedding], n_results=5)
#return {"results": results.get("documents", [])}
# Set a similarity threshold (adjust as needed)
SIMILARITY_THRESHOLD = 0.7
# Extract documents and similarity scores
documents = results.get("documents", [[]])[0] # Ensure we get the first list
distances = results.get("distances", [[]])[0] # Ensure we get the first list
# Filter results based on similarity threshold
filtered_results = [
doc for doc, score in zip(documents, distances) if score >= SIMILARITY_THRESHOLD
]
# Return filtered results or indicate no match found
if filtered_results:
return {"results": filtered_results}
else:
return {"results": "No relevant match found in ChromaDB."}
except Exception as e:
return {"error": str(e)}
import pandas as pd
from io import StringIO
import os
import base64
@app.get("/save_file_dify")
def save_file_dify(csv_data: str):
# Split into lines
lines = csv_data.split("\n")
# Find the max number of columns
max_cols = max(line.count(",") + 1 for line in lines if line.strip())
# Normalize all rows to have the same number of columns
fixed_lines = [line + "," * (max_cols - line.count(",") - 1) for line in lines]
# Reconstruct CSV string
fixed_csv_data = "\n".join(fixed_lines)
# Convert CSV string to DataFrame
df = pd.read_csv(StringIO(fixed_csv_data))
#save in dify dataset and return download link
download_link = get_download_link_dify(df)
return download_link
def get_download_link_dify(df):
# code to save file in dify framework
import requests
# API Configuration
BASE_URL = "http://redmindgpt.redmindtechnologies.com:81/v1"
DATASET_ID = "084ae979-d101-414b-8854-9bbf5d3a442e"
API_KEY = "dataset-feqz5KrqHkFRdWbh2DInt58L"
dataset_name = 'output_dataset'
# Endpoint URL
url = f"{BASE_URL}/datasets/{DATASET_ID}/document/create-by-file"
print(url)
# Headers
headers = {
"Authorization": f"Bearer {API_KEY}"
}
# Data payload (form data as a plain text string)
data_payload = {
"data": """
{
"indexing_technique": "high_quality",
"process_rule": {
"rules": {
"pre_processing_rules": [
{"id": "remove_extra_spaces", "enabled": true},
{"id": "remove_urls_emails", "enabled": true}
],
"segmentation": {
"separator": "###",
"max_tokens": 500
}
},
"mode": "custom"
}
}
"""
}
# Convert DataFrame to binary (in-memory)
file_buffer = dataframe_to_binary(df)
files = {
"file": ("output.xlsx", file_buffer, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
}
# Send the POST request
response = requests.post(url, headers=headers, data=data_payload, files=files)
print(response)
data = response.json()
document_id = data['document']['id']
# code to get download_url
url = f"http://redmindgpt.redmindtechnologies.com:81/v1/datasets/{DATASET_ID}/documents/{document_id}/upload-file"
response = requests.get(url, headers=headers)
print(response)
download_url = response.json().get("download_url")
download_url = download_url.replace("download/","")
return download_url
def dataframe_to_binary(df):
import io
# Create a BytesIO stream
output = io.BytesIO()
# Write the DataFrame to this in-memory buffer as an Excel file
df.to_excel(output, index=False, engine="openpyxl")
# Move the cursor to the beginning of the stream
output.seek(0)
return output