leedoming's picture
Update app.py
7917a23 verified
import streamlit as st
import open_clip
import torch
import requests
from PIL import Image
from io import BytesIO
import time
import json
import numpy as np
from ultralytics import YOLO
import cv2
import chromadb
# Load CLIP model and tokenizer
@st.cache_resource
def load_clip_model():
model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP')
tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
return model, preprocess_val, tokenizer, device
clip_model, preprocess_val, tokenizer, device = load_clip_model()
# Load YOLOv8 model
@st.cache_resource
def load_yolo_model():
return YOLO("./best.pt")
yolo_model = load_yolo_model()
# Load ChromaDB
@st.cache_resource
def load_chromadb():
client = chromadb.PersistentClient(path="./chromadb_new")
collection = client.get_collection(name="clothes_items_musinsa_sumin")
return collection
collection = load_chromadb()
# Helper functions
def load_image_from_url(url, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert('RGB')
return img
except (requests.RequestException, Image.UnidentifiedImageError) as e:
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
# get_image_embedding ν•¨μˆ˜ μˆ˜μ •
def get_image_embedding(image):
image_tensor = preprocess_val(image).unsqueeze(0).to(device)
with torch.no_grad():
image_features = clip_model.encode_image(image_tensor)
image_features /= image_features.norm(dim=-1, keepdim=True)
return image_features.cpu().numpy().squeeze().tolist() # numpy 배열을 파이썬 리슀트둜 λ³€ν™˜
def get_text_embedding(text):
text_tokens = tokenizer([text]).to(device)
with torch.no_grad():
text_features = clip_model.encode_text(text_tokens)
text_features /= text_features.norm(dim=-1, keepdim=True)
return text_features.cpu().numpy()
def get_average_embedding(main_image_url, additional_image_urls):
embeddings = []
# 메인 이미지 μž„λ² λ”©
main_image = load_image_from_url(main_image_url)
if main_image:
embeddings.append(get_image_embedding(main_image))
# μΆ”κ°€ 이미지 μž„λ² λ”©
for url in additional_image_urls:
img = load_image_from_url(url)
if img:
embeddings.append(get_image_embedding(img))
if embeddings:
avg_embedding = np.mean(embeddings, axis=0)
return avg_embedding if isinstance(avg_embedding, np.ndarray) else avg_embedding
else:
return None
def update_collection_embeddings():
all_ids = collection.get(include=['metadatas'])['ids']
all_metadata = collection.get(include=['metadatas'])['metadatas']
batch_size = 100 # ν•œ λ²ˆμ— μ²˜λ¦¬ν•  μ•„μ΄ν…œ 수
for i in range(0, len(all_ids), batch_size):
batch_ids = all_ids[i:i+batch_size]
batch_metadata = all_metadata[i:i+batch_size]
batch_embeddings = []
valid_ids = []
for id, metadata in zip(batch_ids, batch_metadata):
main_image_url = metadata['image_url']
additional_image_urls = metadata.get('additional_images', [])
try:
avg_embedding = get_average_embedding(main_image_url, additional_image_urls)
if avg_embedding is not None:
batch_embeddings.append(avg_embedding)
valid_ids.append(id)
else:
st.warning(f"Failed to generate embedding for item {id}")
except Exception as e:
st.error(f"Error processing item {id}: {str(e)}")
if valid_ids:
try:
collection.update(
ids=valid_ids,
embeddings=batch_embeddings
)
st.success(f"Updated embeddings for {len(valid_ids)} items")
except Exception as e:
st.error(f"Error updating embeddings: {str(e)}")
st.error(f"First embedding type: {type(batch_embeddings[0])}")
st.error(f"First embedding length: {len(batch_embeddings[0])}")
st.error(f"First embedding: {batch_embeddings[0][:10]}...") # 처음 10개 μš”μ†Œλ§Œ 좜λ ₯
# 진행 상황 ν‘œμ‹œ
st.progress((i + batch_size) / len(all_ids))
def find_similar_images(query_embedding, collection, top_k=5):
results = collection.query(
query_embeddings=[query_embedding.squeeze().tolist()],
n_results=top_k,
include=["metadatas", "distances"]
)
similar_items = []
for metadata, distance in zip(results['metadatas'][0], results['distances'][0]):
similar_items.append({
'info': metadata,
'similarity': 1 - distance # 거리λ₯Ό μœ μ‚¬λ„λ‘œ λ³€ν™˜
})
return similar_items
def update_collection_embeddings():
all_ids = collection.get(include=['metadatas'])['ids']
all_metadata = collection.get(include=['metadatas'])['metadatas']
for id, metadata in zip(all_ids, all_metadata):
main_image_url = metadata['image_url']
additional_image_urls = metadata.get('additional_images', [])
avg_embedding = get_average_embedding(main_image_url, additional_image_urls)
if avg_embedding is not None:
collection.update(
ids=[id],
embeddings=[avg_embedding.tolist()]
)
def detect_clothing(image):
results = yolo_model(image)
detections = results[0].boxes.data.cpu().numpy()
categories = []
for detection in detections:
x1, y1, x2, y2, conf, cls = detection
category = yolo_model.names[int(cls)]
if category in ['sunglass','hat','jacket','shirt','pants','shorts','skirt','dress','bag','shoe']:
categories.append({
'category': category,
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'confidence': conf
})
return categories
def crop_image(image, bbox):
return image.crop((bbox[0], bbox[1], bbox[2], bbox[3]))
# μ„Έμ…˜ μƒνƒœ μ΄ˆκΈ°ν™”
if 'step' not in st.session_state:
st.session_state.step = 'input'
if 'query_image_url' not in st.session_state:
st.session_state.query_image_url = ''
if 'detections' not in st.session_state:
st.session_state.detections = []
if 'selected_category' not in st.session_state:
st.session_state.selected_category = None
# Streamlit app
st.title("Advanced Fashion Search App")
# μ»¬λ ‰μ…˜ μž„λ² λ”© μ—…λ°μ΄νŠΈ (첫 μ‹€ν–‰ μ‹œ ν•œ 번만)
if 'embeddings_updated' not in st.session_state:
with st.spinner("Updating collection embeddings... This may take a while."):
update_collection_embeddings()
st.session_state.embeddings_updated = True
# 단계별 처리
if st.session_state.step == 'input':
st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url)
if st.button("Detect Clothing"):
if st.session_state.query_image_url:
query_image = load_image_from_url(st.session_state.query_image_url)
if query_image is not None:
st.session_state.query_image = query_image
st.session_state.detections = detect_clothing(query_image)
if st.session_state.detections:
st.session_state.step = 'select_category'
else:
st.warning("No clothing items detected in the image.")
else:
st.error("Failed to load the image. Please try another URL.")
else:
st.warning("Please enter an image URL.")
elif st.session_state.step == 'select_category':
st.image(st.session_state.query_image, caption="Query Image", use_column_width=True)
st.subheader("Detected Clothing Items:")
for detection in st.session_state.detections:
col1, col2 = st.columns([1, 3])
with col1:
st.write(f"{detection['category']} (Confidence: {detection['confidence']:.2f})")
with col2:
cropped_image = crop_image(st.session_state.query_image, detection['bbox'])
st.image(cropped_image, caption=detection['category'], use_column_width=True)
options = [f"{d['category']} (Confidence: {d['confidence']:.2f})" for d in st.session_state.detections]
selected_option = st.selectbox("Select a category to search:", options)
if st.button("Search Similar Items"):
st.session_state.selected_category = selected_option
st.session_state.step = 'show_results'
elif st.session_state.step == 'show_results':
st.image(st.session_state.query_image, caption="Query Image", use_column_width=True)
selected_detection = next(d for d in st.session_state.detections
if f"{d['category']} (Confidence: {d['confidence']:.2f})" == st.session_state.selected_category)
cropped_image = crop_image(st.session_state.query_image, selected_detection['bbox'])
st.image(cropped_image, caption="Cropped Image", use_column_width=True)
query_embedding = get_image_embedding(cropped_image)
similar_images = find_similar_images(query_embedding, collection)
st.subheader("Similar Items:")
for img in similar_images:
col1, col2 = st.columns(2)
with col1:
st.image(img['info']['image_url'], use_column_width=True)
with col2:
st.write(f"Name: {img['info']['name']}")
st.write(f"Brand: {img['info']['brand']}")
category = img['info'].get('category')
if category:
st.write(f"Category: {category}")
st.write(f"Price: {img['info']['price']}")
st.write(f"Discount: {img['info']['discount']}%")
st.write(f"Similarity: {img['similarity']:.2f}")
# μΆ”κ°€ 이미지 ν‘œμ‹œ
additional_images = img['info'].get('additional_images', [])
if additional_images:
st.write("Additional Images:")
for add_img_url in additional_images[:3]: # μ΅œλŒ€ 3κ°œκΉŒμ§€λ§Œ ν‘œμ‹œ
st.image(add_img_url, width=100)
if st.button("Start New Search"):
st.session_state.step = 'input'
st.session_state.query_image_url = ''
st.session_state.detections = []
st.session_state.selected_category = None
# Text search
st.sidebar.title("Text Search")
query_text = st.sidebar.text_input("Enter search text:")
if st.sidebar.button("Search by Text"):
if query_text:
text_embedding = get_text_embedding(query_text)
similar_images = find_similar_images(text_embedding, collection)
st.sidebar.subheader("Similar Items:")
for img in similar_images:
st.sidebar.image(img['info']['image_url'], use_column_width=True)
st.sidebar.write(f"Name: {img['info']['name']}")
st.sidebar.write(f"Brand: {img['info']['brand']}")
category = img['info'].get('category')
if category:
st.sidebar.write(f"Category: {category}")
st.sidebar.write(f"Price: {img['info']['price']}")
st.sidebar.write(f"Discount: {img['info']['discount']}%")
st.sidebar.write(f"Similarity: {img['similarity']:.2f}")
else:
st.sidebar.warning("Please enter a search text.")