import streamlit as st import open_clip import torch import requests from PIL import Image from io import BytesIO import time import json import numpy as np from ultralytics import YOLO import cv2 import chromadb # Load CLIP model and tokenizer @st.cache_resource def load_clip_model(): model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP') tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) return model, preprocess_val, tokenizer, device clip_model, preprocess_val, tokenizer, device = load_clip_model() # Load YOLOv8 model @st.cache_resource def load_yolo_model(): return YOLO("./best.pt") yolo_model = load_yolo_model() # Load ChromaDB @st.cache_resource def load_chromadb(): client = chromadb.PersistentClient(path="./chromadb_new") collection = client.get_collection(name="clothes_items_musinsa_sumin") return collection collection = load_chromadb() # Helper functions def load_image_from_url(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert('RGB') return img except (requests.RequestException, Image.UnidentifiedImageError) as e: if attempt < max_retries - 1: time.sleep(1) else: return None # get_image_embedding 함수 수정 def get_image_embedding(image): image_tensor = preprocess_val(image).unsqueeze(0).to(device) with torch.no_grad(): image_features = clip_model.encode_image(image_tensor) image_features /= image_features.norm(dim=-1, keepdim=True) return image_features.cpu().numpy().squeeze().tolist() # numpy 배열을 파이썬 리스트로 변환 def get_text_embedding(text): text_tokens = tokenizer([text]).to(device) with torch.no_grad(): text_features = clip_model.encode_text(text_tokens) text_features /= text_features.norm(dim=-1, keepdim=True) return text_features.cpu().numpy() def get_average_embedding(main_image_url, additional_image_urls): embeddings = [] # 메인 이미지 임베딩 main_image = load_image_from_url(main_image_url) if main_image: embeddings.append(get_image_embedding(main_image)) # 추가 이미지 임베딩 for url in additional_image_urls: img = load_image_from_url(url) if img: embeddings.append(get_image_embedding(img)) if embeddings: avg_embedding = np.mean(embeddings, axis=0) return avg_embedding if isinstance(avg_embedding, np.ndarray) else avg_embedding else: return None def update_collection_embeddings(): all_ids = collection.get(include=['metadatas'])['ids'] all_metadata = collection.get(include=['metadatas'])['metadatas'] batch_size = 100 # 한 번에 처리할 아이템 수 for i in range(0, len(all_ids), batch_size): batch_ids = all_ids[i:i+batch_size] batch_metadata = all_metadata[i:i+batch_size] batch_embeddings = [] valid_ids = [] for id, metadata in zip(batch_ids, batch_metadata): main_image_url = metadata['image_url'] additional_image_urls = metadata.get('additional_images', []) try: avg_embedding = get_average_embedding(main_image_url, additional_image_urls) if avg_embedding is not None: batch_embeddings.append(avg_embedding) valid_ids.append(id) else: st.warning(f"Failed to generate embedding for item {id}") except Exception as e: st.error(f"Error processing item {id}: {str(e)}") if valid_ids: try: collection.update( ids=valid_ids, embeddings=batch_embeddings ) st.success(f"Updated embeddings for {len(valid_ids)} items") except Exception as e: st.error(f"Error updating embeddings: {str(e)}") st.error(f"First embedding type: {type(batch_embeddings[0])}") st.error(f"First embedding length: {len(batch_embeddings[0])}") st.error(f"First embedding: {batch_embeddings[0][:10]}...") # 처음 10개 요소만 출력 # 진행 상황 표시 st.progress((i + batch_size) / len(all_ids)) def find_similar_images(query_embedding, collection, top_k=5): results = collection.query( query_embeddings=[query_embedding.squeeze().tolist()], n_results=top_k, include=["metadatas", "distances"] ) similar_items = [] for metadata, distance in zip(results['metadatas'][0], results['distances'][0]): similar_items.append({ 'info': metadata, 'similarity': 1 - distance # 거리를 유사도로 변환 }) return similar_items def update_collection_embeddings(): all_ids = collection.get(include=['metadatas'])['ids'] all_metadata = collection.get(include=['metadatas'])['metadatas'] for id, metadata in zip(all_ids, all_metadata): main_image_url = metadata['image_url'] additional_image_urls = metadata.get('additional_images', []) avg_embedding = get_average_embedding(main_image_url, additional_image_urls) if avg_embedding is not None: collection.update( ids=[id], embeddings=[avg_embedding.tolist()] ) def detect_clothing(image): results = yolo_model(image) detections = results[0].boxes.data.cpu().numpy() categories = [] for detection in detections: x1, y1, x2, y2, conf, cls = detection category = yolo_model.names[int(cls)] if category in ['sunglass','hat','jacket','shirt','pants','shorts','skirt','dress','bag','shoe']: categories.append({ 'category': category, 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': conf }) return categories def crop_image(image, bbox): return image.crop((bbox[0], bbox[1], bbox[2], bbox[3])) # 세션 상태 초기화 if 'step' not in st.session_state: st.session_state.step = 'input' if 'query_image_url' not in st.session_state: st.session_state.query_image_url = '' if 'detections' not in st.session_state: st.session_state.detections = [] if 'selected_category' not in st.session_state: st.session_state.selected_category = None # Streamlit app st.title("Advanced Fashion Search App") # 컬렉션 임베딩 업데이트 (첫 실행 시 한 번만) if 'embeddings_updated' not in st.session_state: with st.spinner("Updating collection embeddings... This may take a while."): update_collection_embeddings() st.session_state.embeddings_updated = True # 단계별 처리 if st.session_state.step == 'input': st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url) if st.button("Detect Clothing"): if st.session_state.query_image_url: query_image = load_image_from_url(st.session_state.query_image_url) if query_image is not None: st.session_state.query_image = query_image st.session_state.detections = detect_clothing(query_image) if st.session_state.detections: st.session_state.step = 'select_category' else: st.warning("No clothing items detected in the image.") else: st.error("Failed to load the image. Please try another URL.") else: st.warning("Please enter an image URL.") elif st.session_state.step == 'select_category': st.image(st.session_state.query_image, caption="Query Image", use_column_width=True) st.subheader("Detected Clothing Items:") for detection in st.session_state.detections: col1, col2 = st.columns([1, 3]) with col1: st.write(f"{detection['category']} (Confidence: {detection['confidence']:.2f})") with col2: cropped_image = crop_image(st.session_state.query_image, detection['bbox']) st.image(cropped_image, caption=detection['category'], use_column_width=True) options = [f"{d['category']} (Confidence: {d['confidence']:.2f})" for d in st.session_state.detections] selected_option = st.selectbox("Select a category to search:", options) if st.button("Search Similar Items"): st.session_state.selected_category = selected_option st.session_state.step = 'show_results' elif st.session_state.step == 'show_results': st.image(st.session_state.query_image, caption="Query Image", use_column_width=True) selected_detection = next(d for d in st.session_state.detections if f"{d['category']} (Confidence: {d['confidence']:.2f})" == st.session_state.selected_category) cropped_image = crop_image(st.session_state.query_image, selected_detection['bbox']) st.image(cropped_image, caption="Cropped Image", use_column_width=True) query_embedding = get_image_embedding(cropped_image) similar_images = find_similar_images(query_embedding, collection) st.subheader("Similar Items:") for img in similar_images: col1, col2 = st.columns(2) with col1: st.image(img['info']['image_url'], use_column_width=True) with col2: st.write(f"Name: {img['info']['name']}") st.write(f"Brand: {img['info']['brand']}") category = img['info'].get('category') if category: st.write(f"Category: {category}") st.write(f"Price: {img['info']['price']}") st.write(f"Discount: {img['info']['discount']}%") st.write(f"Similarity: {img['similarity']:.2f}") # 추가 이미지 표시 additional_images = img['info'].get('additional_images', []) if additional_images: st.write("Additional Images:") for add_img_url in additional_images[:3]: # 최대 3개까지만 표시 st.image(add_img_url, width=100) if st.button("Start New Search"): st.session_state.step = 'input' st.session_state.query_image_url = '' st.session_state.detections = [] st.session_state.selected_category = None # Text search st.sidebar.title("Text Search") query_text = st.sidebar.text_input("Enter search text:") if st.sidebar.button("Search by Text"): if query_text: text_embedding = get_text_embedding(query_text) similar_images = find_similar_images(text_embedding, collection) st.sidebar.subheader("Similar Items:") for img in similar_images: st.sidebar.image(img['info']['image_url'], use_column_width=True) st.sidebar.write(f"Name: {img['info']['name']}") st.sidebar.write(f"Brand: {img['info']['brand']}") category = img['info'].get('category') if category: st.sidebar.write(f"Category: {category}") st.sidebar.write(f"Price: {img['info']['price']}") st.sidebar.write(f"Discount: {img['info']['discount']}%") st.sidebar.write(f"Similarity: {img['similarity']:.2f}") else: st.sidebar.warning("Please enter a search text.")