import os from elasticsearch import Elasticsearch from elasticsearch_dsl import Search from models.product import Product from models.customer import Customer from models.order import Order cloud_id = os.getenv("ELASTICSEARCH_CLOUD_ID") api_key = os.getenv("ELASTICSEARCH_API_KEY") try: es = Elasticsearch(cloud_id=cloud_id, api_key=api_key) except ConnectionError as e: print("Error:", e) def search_elasticsearch(index, query): response = es.search(index=index, body={ "query": { "multi_match": { "query": query, "fields": ["title", "description", "category", "tags"] } } }) return response['hits']['hits'] def search_products_by_keywords(encoded_query): body_query = { "_source": ['Gender', 'Category', 'SubCategory', 'ProductType', 'Colour', 'Usage', 'ProductTitle', 'ImageURL'], "knn": { "field": "ImageEmbedding", "query_vector": encoded_query, "k": 4, "num_candidates": 500 } } response = es.search(index='fashion', body=body_query) return response['hits']['hits'] def search_products_by_filters(filters): response = es.search(index='fashion', body={ "query": { "bool": { "filter": filters } } }) return response['hits']['hits'] def get_product_details(product_id): response = es.get(index='fashion', id=product_id) return response['_source'] def index_product(product: Product): es.index(index="fashion", id=product.product_id, body=product.dict()) def index_customer(customer: Customer): es.index(index="customers", id=customer.customer_id, body=customer.dict()) def get_customer(customer_id: str): response = es.get(index="customers", id=customer_id) return response['_source'] def update_customer(customer_id: str, updates: dict): es.update(index="customers", id=customer_id, body={"doc": updates}) def index_order(order: Order): es.index(index="orders", id=order.order_id, body=order.dict()) def track_order(order_id: str): response = es.get(index="orders", id=order_id) return response['_source'] def get_order_details(order_id): s_order = Search(using=es, index='orders').query( 'match', **{'Order ID': order_id}) s_details = Search(using=es, index='order_details').query( 'match', **{'Order ID': order_id}) order_response = s_order.execute() details_response = s_details.execute() if order_response.hits.total.value > 0 and details_response.hits.total.value > 0: order = order_response.hits[0].to_dict() details = [detail.to_dict() for detail in details_response.hits] return { "order": order, "details": details } else: return "Order not found" def add_to_cart(customer_id: str, product_id: str, quantity: int): cart = es.get(index="carts", id=customer_id)['_source'] if 'items' not in cart: cart['items'] = [] cart['items'].append({"product_id": product_id, "quantity": quantity}) es.index(index="carts", id=customer_id, body=cart) return cart def view_cart(customer_id: str): response = es.get(index="carts", id=customer_id) return response['_source'] def remove_from_cart(customer_id: str, product_id: str): cart = es.get(index="carts", id=customer_id)['_source'] cart['items'] = [item for item in cart['items'] if item['product_id'] != product_id] es.index(index="carts", id=customer_id, body=cart) return cart def checkout(customer_id: str): cart = es.get(index="carts", id=customer_id)['_source'] order = { "customer_id": customer_id, "items": cart['items'], "status": "processing" } es.index(index="orders", body=order) es.delete(index="carts", id=customer_id) return order