from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from fastapi.encoders import jsonable_encoder
# Define the FastAPI app
app = FastAPI(docs_url="/")
# Add the CORS middleware to the app
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
key = "AIzaSyCEiSxvAfXHAXNE2Q5b95vBpwjlbjl5GO8"
@app.get("/search")
async def search(
query: str,
add_chatgpt_results: bool = False,
n_results: int = 10,
):
"""
Get the results from the Google Books API, OpenAlex, and optionally OpenAI.
"""
import time
import requests
start_time = time.time()
# Initialize the lists to store the results
titles = []
authors = []
publishers = []
descriptions = []
images = []
def gbooks_search(query, n_results=30):
"""
Access the Google Books API and return the results.
"""
# Set the API endpoint and query parameters
url = "https://www.googleapis.com/books/v1/volumes"
params = {
"q": str(query),
"printType": "books",
"maxResults": n_results,
"key": key,
}
# Send a GET request to the API with the specified parameters
response = requests.get(url, params=params)
# Parse the response JSON and append the results
data = response.json()
# Initialize the lists to store the results
titles = []
authors = []
publishers = []
descriptions = []
images = []
for item in data["items"]:
volume_info = item["volumeInfo"]
try:
titles.append(f"{volume_info['title']}: {volume_info['subtitle']}")
except KeyError:
titles.append(volume_info["title"])
try:
descriptions.append(volume_info["description"])
except KeyError:
descriptions.append("Null")
try:
publishers.append(volume_info["publisher"])
except KeyError:
publishers.append("Null")
try:
authors.append(volume_info["authors"][0])
except KeyError:
authors.append("Null")
try:
images.append(volume_info["imageLinks"]["thumbnail"])
except KeyError:
images.append(
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png"
)
return titles, authors, publishers, descriptions, images
# Run the gbooks_search function
(
titles_placeholder,
authors_placeholder,
publishers_placeholder,
descriptions_placeholder,
images_placeholder,
) = gbooks_search(query, n_results=n_results)
# Append the results to the lists
[titles.append(title) for title in titles_placeholder]
[authors.append(author) for author in authors_placeholder]
[publishers.append(publisher) for publisher in publishers_placeholder]
[descriptions.append(description) for description in descriptions_placeholder]
[images.append(image) for image in images_placeholder]
# Get the time since the start
first_checkpoint = time.time()
first_checkpoint_time = int(first_checkpoint - start_time)
def openalex_search(query, n_results=10):
"""
Run a search on OpenAlex and return the results.
"""
import pyalex
from pyalex import Works
# Add email to the config
pyalex.config.email = "ber2mir@gmail.com"
# Define a pager object with the same query
pager = Works().search(str(query)).paginate(per_page=n_results, n_max=n_results)
# Generate a list of the results
openalex_results = list(pager)
# Initialize the lists to store the results
titles = []
authors = []
publishers = []
descriptions = []
images = []
# Get the titles, descriptions, and publishers and append them to the lists
try:
for result in openalex_results[0]:
try:
titles.append(result["title"])
except KeyError:
titles.append("Null")
try:
descriptions.append(result["abstract"])
except KeyError:
descriptions.append("Null")
try:
publishers.append(result["host_venue"]["publisher"])
except KeyError:
publishers.append("Null")
try:
authors.append(result["authorships"][0]["author"]["display_name"])
except KeyError:
authors.append("Null")
images.append(
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png"
)
except IndexError:
titles.append("Null")
descriptions.append("Null")
publishers.append("Null")
authors.append("Null")
images.append(
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png"
)
return titles, authors, publishers, descriptions, images
# Run the openalex_search function
(
titles_placeholder,
authors_placeholder,
publishers_placeholder,
descriptions_placeholder,
images_placeholder,
) = openalex_search(query, n_results=n_results)
# Append the results to the lists
[titles.append(title) for title in titles_placeholder]
[authors.append(author) for author in authors_placeholder]
[publishers.append(publisher) for publisher in publishers_placeholder]
[descriptions.append(description) for description in descriptions_placeholder]
[images.append(image) for image in images_placeholder]
# Calculate the elapsed time between the first and second checkpoints
second_checkpoint = time.time()
second_checkpoint_time = int(second_checkpoint - first_checkpoint)
def openai_search(query, n_results=10):
"""
Create a query to the OpenAI ChatGPT API and return the results.
"""
import openai
# Initialize the lists to store the results
titles = []
authors = []
publishers = []
descriptions = []
images = []
# Set the OpenAI API key
openai.api_key = "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"
# Create ChatGPT query
chatgpt_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a librarian. You are helping a patron find a book.",
},
{
"role": "user",
"content": f"Recommend me {n_results} books about {query}. Your response should be like: 'title:
, author: , publisher: , summary: '",
},
],
)
# Split the response into a list of results
chatgpt_results = chatgpt_response["choices"][0]["message"]["content"].split(
"\n"
)[2::2]
# Define a function to parse the results
def parse_result(
result, ordered_keys=["Title", "Author", "Publisher", "Summary"]
):
# Create a dict to store the key-value pairs
parsed_result = {}
for key in ordered_keys:
# Split the result string by the key and append the value to the list
if key != ordered_keys[-1]:
parsed_result[key] = result.split(f"{key}: ")[1].split(",")[0]
else:
parsed_result[key] = result.split(f"{key}: ")[1]
return parsed_result
ordered_keys = ["Title", "Author", "Publisher", "Summary"]
for result in chatgpt_results:
try:
# Parse the result
parsed_result = parse_result(result, ordered_keys=ordered_keys)
# Append the parsed result to the lists
titles.append(parsed_result["Title"])
authors.append(parsed_result["Author"])
publishers.append(parsed_result["Publisher"])
descriptions.append(parsed_result["Summary"])
images.append(
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png"
)
# In case the OpenAI API hits the limit
except IndexError:
break
return titles, authors, publishers, descriptions, images
if add_chatgpt_results:
# Run the openai_search function
(
titles_placeholder,
authors_placeholder,
publishers_placeholder,
descriptions_placeholder,
images_placeholder,
) = openai_search(query)
# Append the results to the lists
[titles.append(title) for title in titles_placeholder]
[authors.append(author) for author in authors_placeholder]
[publishers.append(publisher) for publisher in publishers_placeholder]
[descriptions.append(description) for description in descriptions_placeholder]
[images.append(image) for image in images_placeholder]
# Calculate the elapsed time between the second and third checkpoints
third_checkpoint = time.time()
third_checkpoint_time = int(third_checkpoint - second_checkpoint)
results = [
{
"title": title,
"author": author,
"publisher": publisher,
"description": description,
"image": image,
}
for title, author, publisher, description, image in zip(
titles, authors, publishers, descriptions, images
)
]
response = {"results": results}
return response
@app.post("/classify")
async def classify(data: dict, runtime: str = "normal"):
"""
Create classifier pipeline and return the results.
"""
titles = [book["title"] for book in data["results"]]
descriptions = [book["description"] for book in data["results"]]
publishers = [book["publisher"] for book in data["results"]]
# Combine title, description, and publisher into a single string
combined_data = [
f"The book's title is {title}. It is published by {publisher}. This book is about {description}"
for title, description, publisher in zip(titles, descriptions, publishers)
]
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
pipeline,
)
from optimum.onnxruntime import ORTModelForSequenceClassification
from optimum.bettertransformer import BetterTransformer
if runtime == "normal":
# Define the zero-shot classifier
tokenizer = AutoTokenizer.from_pretrained(
"sileod/deberta-v3-base-tasksource-nli"
)
model = AutoModelForSequenceClassification.from_pretrained(
"sileod/deberta-v3-base-tasksource-nli"
)
elif runtime == "onnxruntime":
tokenizer = AutoTokenizer.from_pretrained(
"optimum/distilbert-base-uncased-mnli"
)
model = ORTModelForSequenceClassification.from_pretrained(
"optimum/distilbert-base-uncased-mnli"
)
classifier_pipe = pipeline(
"zero-shot-classification",
model=model,
tokenizer=tokenizer,
hypothesis_template="This book is {}.",
batch_size=1,
device=-1,
multi_label=False,
)
# Define the candidate labels
level = [
"Introductory",
"Advanced",
]
audience = ["Academic", "Not Academic", "Manual"]
classes = [
{
"audience": classifier_pipe(doc, audience)["labels"][0],
"level": classifier_pipe(doc, level)["scores"][0],
}
for doc in combined_data
]
return classes
@app.post("/find_similar")
async def find_similar(data: dict, runtime: str = "normal", top_k: int = 5):
"""
Calculate the similarity between the books and return the top_k results.
"""
from sentence_transformers import SentenceTransformer
from sentence_transformers import util
titles = [book["title"] for book in data["results"]]
descriptions = [book["description"] for book in data["results"]]
publishers = [book["publisher"] for book in data["results"]]
# Combine title, description, and publisher into a single string
combined_data = [
f"The book's title is {title}. It is published by {publisher}. This book is about {description}"
for title, description, publisher in zip(titles, descriptions, publishers)
]
sentence_transformer = SentenceTransformer("all-MiniLM-L6-v2")
book_embeddings = sentence_transformer.encode(combined_data, convert_to_tensor=True)
# Make sure that the top_k value is not greater than the number of books
top_k = len(combined_data) if top_k > len(combined_data) else top_k
similar_books = []
for i in range(len(combined_data)):
# Get the embedding for the ith book
current_embedding = book_embeddings[i]
# Calculate the similarity between the ith book and the rest of the books
similarity_sorted = util.semantic_search(
current_embedding, book_embeddings, top_k=top_k
)
# Append the results to the list
similar_books.append(
{
"sorted_by_similarity": similarity_sorted[0][1:],
}
)
response = {"results": similar_books}
return response
@app.post("/summarize")
async def summarize(descriptions: list, runtime="normal"):
"""
Summarize the descriptions and return the results.
"""
from transformers import (
AutoTokenizer,
AutoModelForSeq2SeqLM,
pipeline,
)
from optimum.onnxruntime import ORTModelForSeq2SeqLM
from optimum.bettertransformer import BetterTransformer
# Define the summarizer model and tokenizer
if runtime == "normal":
tokenizer = AutoTokenizer.from_pretrained("lidiya/bart-base-samsum")
model = AutoModelForSeq2SeqLM.from_pretrained("lidiya/bart-base-samsum")
model = BetterTransformer.transform(model)
elif runtime == "onnxruntime":
tokenizer = AutoTokenizer.from_pretrained("optimum/t5-small")
model = ORTModelForSeq2SeqLM.from_pretrained("optimum/t5-small")
# Create the summarizer pipeline
summarizer_pipe = pipeline("summarization", model=model, tokenizer=tokenizer)
# Summarize the descriptions
summaries = [
summarizer_pipe(description)
if (len(description) > 0 and description != "Null")
else [{"summary_text": "No summary text is available."}]
for description in descriptions
]
return summaries
def classify(combined_data, runtime="normal"):
"""
Create classifier pipeline and return the results.
"""
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
pipeline,
)
from optimum.onnxruntime import ORTModelForSequenceClassification
from optimum.bettertransformer import BetterTransformer
if runtime == "normal":
# Define the zero-shot classifier
tokenizer = AutoTokenizer.from_pretrained(
"sileod/deberta-v3-base-tasksource-nli"
)
model = AutoModelForSequenceClassification.from_pretrained(
"sileod/deberta-v3-base-tasksource-nli"
)
elif runtime == "onnxruntime":
tokenizer = AutoTokenizer.from_pretrained(
"optimum/distilbert-base-uncased-mnli"
)
model = ORTModelForSequenceClassification.from_pretrained(
"optimum/distilbert-base-uncased-mnli"
)
classifier_pipe = pipeline(
"zero-shot-classification",
model=model,
tokenizer=tokenizer,
hypothesis_template="This book is {}.",
batch_size=1,
device=-1,
multi_label=False,
)
# Define the candidate labels
level = [
"Introductory",
"Advanced",
]
audience = ["Academic", "Not Academic", "Manual"]
classes = [
{
"audience": classifier_pipe(doc, audience),
"level": classifier_pipe(doc, level),
}
for doc in combined_data
]
return classes
# If true then run the similarity, summarize, and classify functions
if classification:
classes = classify(combined_data, runtime="normal")
else:
classes = [
{"labels": ["No labels available."], "scores": [0]}
for i in range(len(combined_data))
]
# Calculate the elapsed time between the third and fourth checkpoints
fourth_checkpoint = time.time()
classification_time = int(fourth_checkpoint - third_checkpoint)
if summarization:
summaries = summarize(descriptions, runtime="normal")
else:
summaries = [
[{"summary_text": description}]
if (len(description) > 0)
else [{"summary_text": "No summary text is available."}]
for description in descriptions
]
# Calculate the elapsed time between the fourth and fifth checkpoints
fifth_checkpoint = time.time()
summarization_time = int(fifth_checkpoint - fourth_checkpoint)
if similarity:
similar_books = find_similar(combined_data)
else:
similar_books = [
{"sorted_by_similarity": ["No similar books available."]}
for i in range(len(combined_data))
]
# Calculate the elapsed time between the fifth and sixth checkpoints
sixth_checkpoint = time.time()
similarity_time = int(sixth_checkpoint - fifth_checkpoint)
# Calculate the total elapsed time
end_time = time.time()
runtime = f"{end_time - start_time:.2f} seconds"
# Create a list of dictionaries to store the results
results = []
for i in range(len(titles)):
results.append(
{
"id": i,
"title": titles[i],
"author": authors[i],
"publisher": publishers[i],
"image_link": images[i],
"audience": classes[i]["audience"]["labels"][0],
"audience_confidence": classes[i]["audience"]["scores"][0],
"level": classes[i]["level"]["labels"][0],
"level_confidence": classes[i]["level"]["scores"][0],
"summary": summaries[i][0]["summary_text"],
"similar_books": similar_books[i]["sorted_by_similarity"],
"runtime": {
"total": runtime,
"classification": classification_time,
"summarization": summarization_time,
"similarity": similarity_time,
},
}
)
return results