from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from fastapi.encoders import jsonable_encoder # Define the FastAPI app app = FastAPI(docs_url="/") # Add the CORS middleware to the app app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) key = "AIzaSyCEiSxvAfXHAXNE2Q5b95vBpwjlbjl5GO8" @app.get("/search") async def search( query: str, add_chatgpt_results: bool = False, n_results: int = 10, ): """ Get the results from the Google Books API, OpenAlex, and optionally OpenAI. """ import time import requests start_time = time.time() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] def gbooks_search(query, n_results=30): """ Access the Google Books API and return the results. """ # Set the API endpoint and query parameters url = "https://www.googleapis.com/books/v1/volumes" params = { "q": str(query), "printType": "books", "maxResults": n_results, "key": key, } # Send a GET request to the API with the specified parameters response = requests.get(url, params=params) # Parse the response JSON and append the results data = response.json() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] for item in data["items"]: volume_info = item["volumeInfo"] try: titles.append(f"{volume_info['title']}: {volume_info['subtitle']}") except KeyError: titles.append(volume_info["title"]) try: descriptions.append(volume_info["description"]) except KeyError: descriptions.append("Null") try: publishers.append(volume_info["publisher"]) except KeyError: publishers.append("Null") try: authors.append(volume_info["authors"][0]) except KeyError: authors.append("Null") try: images.append(volume_info["imageLinks"]["thumbnail"]) except KeyError: images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images # Run the gbooks_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = gbooks_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Get the time since the start first_checkpoint = time.time() first_checkpoint_time = int(first_checkpoint - start_time) def openalex_search(query, n_results=10): """ Run a search on OpenAlex and return the results. """ import pyalex from pyalex import Works # Add email to the config pyalex.config.email = "ber2mir@gmail.com" # Define a pager object with the same query pager = Works().search(str(query)).paginate(per_page=n_results, n_max=n_results) # Generate a list of the results openalex_results = list(pager) # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Get the titles, descriptions, and publishers and append them to the lists try: for result in openalex_results[0]: try: titles.append(result["title"]) except KeyError: titles.append("Null") try: descriptions.append(result["abstract"]) except KeyError: descriptions.append("Null") try: publishers.append(result["host_venue"]["publisher"]) except KeyError: publishers.append("Null") try: authors.append(result["authorships"][0]["author"]["display_name"]) except KeyError: authors.append("Null") images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) except IndexError: titles.append("Null") descriptions.append("Null") publishers.append("Null") authors.append("Null") images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images # Run the openalex_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openalex_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the first and second checkpoints second_checkpoint = time.time() second_checkpoint_time = int(second_checkpoint - first_checkpoint) def openai_search(query, n_results=10): """ Create a query to the OpenAI ChatGPT API and return the results. """ import openai # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Set the OpenAI API key openai.api_key = "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" # Create ChatGPT query chatgpt_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are a librarian. You are helping a patron find a book.", }, { "role": "user", "content": f"Recommend me {n_results} books about {query}. Your response should be like: 'title: , author: <author>, publisher: <publisher>, summary: <summary>'", }, ], ) # Split the response into a list of results chatgpt_results = chatgpt_response["choices"][0]["message"]["content"].split( "\n" )[2::2] # Define a function to parse the results def parse_result( result, ordered_keys=["Title", "Author", "Publisher", "Summary"] ): # Create a dict to store the key-value pairs parsed_result = {} for key in ordered_keys: # Split the result string by the key and append the value to the list if key != ordered_keys[-1]: parsed_result[key] = result.split(f"{key}: ")[1].split(",")[0] else: parsed_result[key] = result.split(f"{key}: ")[1] return parsed_result ordered_keys = ["Title", "Author", "Publisher", "Summary"] for result in chatgpt_results: try: # Parse the result parsed_result = parse_result(result, ordered_keys=ordered_keys) # Append the parsed result to the lists titles.append(parsed_result["Title"]) authors.append(parsed_result["Author"]) publishers.append(parsed_result["Publisher"]) descriptions.append(parsed_result["Summary"]) images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) # In case the OpenAI API hits the limit except IndexError: break return titles, authors, publishers, descriptions, images if add_chatgpt_results: # Run the openai_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openai_search(query) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the second and third checkpoints third_checkpoint = time.time() third_checkpoint_time = int(third_checkpoint - second_checkpoint) results = [ { "title": title, "author": author, "publisher": publisher, "description": description, "image": image, } for title, author, publisher, description, image in zip( titles, authors, publishers, descriptions, images ) ] response = {"results": results} return response @app.post("/classify") async def classify(data: dict, runtime: str = "normal"): """ Create classifier pipeline and return the results. """ titles = [book["title"] for book in data["results"]] descriptions = [book["description"] for book in data["results"]] publishers = [book["publisher"] for book in data["results"]] # Combine title, description, and publisher into a single string combined_data = [ f"The book's title is {title}. It is published by {publisher}. This book is about {description}" for title, description, publisher in zip(titles, descriptions, publishers) ] from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, ) from optimum.onnxruntime import ORTModelForSequenceClassification from optimum.bettertransformer import BetterTransformer if runtime == "normal": # Define the zero-shot classifier tokenizer = AutoTokenizer.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) model = AutoModelForSequenceClassification.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) elif runtime == "onnxruntime": tokenizer = AutoTokenizer.from_pretrained( "optimum/distilbert-base-uncased-mnli" ) model = ORTModelForSequenceClassification.from_pretrained( "optimum/distilbert-base-uncased-mnli" ) classifier_pipe = pipeline( "zero-shot-classification", model=model, tokenizer=tokenizer, hypothesis_template="This book is {}.", batch_size=1, device=-1, multi_label=False, ) # Define the candidate labels level = [ "Introductory", "Advanced", ] audience = ["Academic", "Not Academic", "Manual"] classes = [ { "audience": classifier_pipe(doc, audience)["labels"][0], "level": classifier_pipe(doc, level)["scores"][0], } for doc in combined_data ] return classes @app.post("/find_similar") async def find_similar(data: dict, runtime: str = "normal", top_k: int = 5): """ Calculate the similarity between the books and return the top_k results. """ from sentence_transformers import SentenceTransformer from sentence_transformers import util titles = [book["title"] for book in data["results"]] descriptions = [book["description"] for book in data["results"]] publishers = [book["publisher"] for book in data["results"]] # Combine title, description, and publisher into a single string combined_data = [ f"The book's title is {title}. It is published by {publisher}. This book is about {description}" for title, description, publisher in zip(titles, descriptions, publishers) ] sentence_transformer = SentenceTransformer("all-MiniLM-L6-v2") book_embeddings = sentence_transformer.encode(combined_data, convert_to_tensor=True) # Make sure that the top_k value is not greater than the number of books top_k = len(combined_data) if top_k > len(combined_data) else top_k similar_books = [] for i in range(len(combined_data)): # Get the embedding for the ith book current_embedding = book_embeddings[i] # Calculate the similarity between the ith book and the rest of the books similarity_sorted = util.semantic_search( current_embedding, book_embeddings, top_k=top_k ) # Append the results to the list similar_books.append( { "sorted_by_similarity": similarity_sorted[0][1:], } ) response = {"results": similar_books} return response @app.post("/summarize") async def summarize(descriptions: list, runtime="normal"): """ Summarize the descriptions and return the results. """ from transformers import ( AutoTokenizer, AutoModelForSeq2SeqLM, pipeline, ) from optimum.onnxruntime import ORTModelForSeq2SeqLM from optimum.bettertransformer import BetterTransformer # Define the summarizer model and tokenizer if runtime == "normal": tokenizer = AutoTokenizer.from_pretrained("lidiya/bart-base-samsum") model = AutoModelForSeq2SeqLM.from_pretrained("lidiya/bart-base-samsum") model = BetterTransformer.transform(model) elif runtime == "onnxruntime": tokenizer = AutoTokenizer.from_pretrained("optimum/t5-small") model = ORTModelForSeq2SeqLM.from_pretrained("optimum/t5-small") # Create the summarizer pipeline summarizer_pipe = pipeline("summarization", model=model, tokenizer=tokenizer) # Summarize the descriptions summaries = [ summarizer_pipe(description) if (len(description) > 0 and description != "Null") else [{"summary_text": "No summary text is available."}] for description in descriptions ] return summaries def classify(combined_data, runtime="normal"): """ Create classifier pipeline and return the results. """ from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, ) from optimum.onnxruntime import ORTModelForSequenceClassification from optimum.bettertransformer import BetterTransformer if runtime == "normal": # Define the zero-shot classifier tokenizer = AutoTokenizer.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) model = AutoModelForSequenceClassification.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) elif runtime == "onnxruntime": tokenizer = AutoTokenizer.from_pretrained( "optimum/distilbert-base-uncased-mnli" ) model = ORTModelForSequenceClassification.from_pretrained( "optimum/distilbert-base-uncased-mnli" ) classifier_pipe = pipeline( "zero-shot-classification", model=model, tokenizer=tokenizer, hypothesis_template="This book is {}.", batch_size=1, device=-1, multi_label=False, ) # Define the candidate labels level = [ "Introductory", "Advanced", ] audience = ["Academic", "Not Academic", "Manual"] classes = [ { "audience": classifier_pipe(doc, audience), "level": classifier_pipe(doc, level), } for doc in combined_data ] return classes # If true then run the similarity, summarize, and classify functions if classification: classes = classify(combined_data, runtime="normal") else: classes = [ {"labels": ["No labels available."], "scores": [0]} for i in range(len(combined_data)) ] # Calculate the elapsed time between the third and fourth checkpoints fourth_checkpoint = time.time() classification_time = int(fourth_checkpoint - third_checkpoint) if summarization: summaries = summarize(descriptions, runtime="normal") else: summaries = [ [{"summary_text": description}] if (len(description) > 0) else [{"summary_text": "No summary text is available."}] for description in descriptions ] # Calculate the elapsed time between the fourth and fifth checkpoints fifth_checkpoint = time.time() summarization_time = int(fifth_checkpoint - fourth_checkpoint) if similarity: similar_books = find_similar(combined_data) else: similar_books = [ {"sorted_by_similarity": ["No similar books available."]} for i in range(len(combined_data)) ] # Calculate the elapsed time between the fifth and sixth checkpoints sixth_checkpoint = time.time() similarity_time = int(sixth_checkpoint - fifth_checkpoint) # Calculate the total elapsed time end_time = time.time() runtime = f"{end_time - start_time:.2f} seconds" # Create a list of dictionaries to store the results results = [] for i in range(len(titles)): results.append( { "id": i, "title": titles[i], "author": authors[i], "publisher": publishers[i], "image_link": images[i], "audience": classes[i]["audience"]["labels"][0], "audience_confidence": classes[i]["audience"]["scores"][0], "level": classes[i]["level"]["labels"][0], "level_confidence": classes[i]["level"]["scores"][0], "summary": summaries[i][0]["summary_text"], "similar_books": similar_books[i]["sorted_by_similarity"], "runtime": { "total": runtime, "classification": classification_time, "summarization": summarization_time, "similarity": similarity_time, }, } ) return results