surveyia / main.py
datacipen's picture
Update main.py
8683f94 verified
raw
history blame
No virus
17.1 kB
import os
import json
import bcrypt
from typing import List
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_huggingface import HuggingFaceEndpoint
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import StrOutputParser
from operator import itemgetter
from pinecone import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.memory import ConversationBufferMemory
from langchain.schema.runnable import Runnable, RunnablePassthrough, RunnableConfig, RunnableLambda
from langchain.callbacks.base import BaseCallbackHandler
from langchain.chains import (
StuffDocumentsChain, ConversationalRetrievalChain
)
import chainlit as cl
from chainlit.input_widget import TextInput, Select, Switch, Slider
from chainlit.types import ThreadDict
from deep_translator import GoogleTranslator
from datetime import timedelta
from literalai import AsyncLiteralClient
async_literal_client = AsyncLiteralClient(api_key=os.getenv("LITERAL_API_KEY"))
@cl.password_auth_callback
def auth_callback(username: str, password: str):
auth = json.loads(os.environ['CHAINLIT_AUTH_LOGIN'])
ident = next(d['ident'] for d in auth if d['ident'] == username)
pwd = next(d['pwd'] for d in auth if d['ident'] == username)
resultLogAdmin = bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(ident.encode('utf-8'), bcrypt.gensalt()))
resultPwdAdmin = bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(pwd.encode('utf-8'), bcrypt.gensalt()))
resultRole = next(d['role'] for d in auth if d['ident'] == username)
if resultLogAdmin and resultPwdAdmin and resultRole == "admindatapcc":
return cl.User(
identifier=ident + " : 🧑‍💼 Admin Datapcc", metadata={"role": "admin", "provider": "credentials"}
)
elif resultLogAdmin and resultPwdAdmin and resultRole == "userdatapcc":
return cl.User(
identifier=ident + " : 🧑‍🎓 User Datapcc", metadata={"role": "user", "provider": "credentials"}
)
@cl.step(type="tool")
async def LLModel():
os.environ['HUGGINGFACEHUB_API_TOKEN'] = os.environ['HUGGINGFACEHUB_API_TOKEN']
repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1"
llm = HuggingFaceEndpoint(
repo_id=repo_id, max_new_tokens=5300, temperature=1.0, task="text2text-generation", streaming=True
)
return llm
@cl.step(type="tool")
async def VectorDatabase(categorie):
if categorie == "bibliographie-OPP-DGDIN":
index_name = "all-venus"
embeddings = HuggingFaceEmbeddings()
vectorstore = PineconeVectorStore(
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEY')
)
elif categorie == "year" or categorie == "videosTC":
index_name = "all-jdlp"
embeddings = HuggingFaceEmbeddings()
vectorstore = PineconeVectorStore(
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYJDLP')
)
elif categorie == "skills":
index_name = "all-skills"
embeddings = HuggingFaceEmbeddings()
vectorstore = PineconeVectorStore(
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYSKILLS')
)
return vectorstore
@cl.step(type="retrieval")
async def Retriever(categorie):
vectorstore = await VectorDatabase(categorie)
if categorie == "bibliographie-OPP-DGDIN":
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 150,"filter": {'categorie': {'$eq': categorie}}})
elif categorie == "year":
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 6,"filter": {'year': {'$gte': 2019}}})
elif categorie == "skills":
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {'file': {'$eq': 'competences-master-CFA.csv'}}})
elif categorie == "videosTC":
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {"title": {"$eq": "videos-confinement-timeline"}}})
return retriever
@cl.step(type="embedding")
async def Search(input, categorie):
vectorstore = await VectorDatabase(categorie)
results = []
test = []
sources_text = ""
sources_offres = ""
verbatim_text = ""
count = 0
countOffres = 0
if categorie == "bibliographie-OPP-DGDIN":
search = vectorstore.similarity_search(input,k=50, filter={"categorie": {"$eq": categorie}})
for i in range(0,len(search)):
if search[i].metadata['Lien'] not in test:
if count <= 15:
count = count + 1
test.append(search[i].metadata['Lien'])
sources_text = sources_text + str(count) + ". " + search[i].metadata['Titre'] + ', ' + search[i].metadata['Auteurs'] + ', ' + search[i].metadata['Lien'] + "\n"
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['Phrase'] + "</p><p>&nbsp;</p>"
elif categorie == "year":
search = vectorstore.similarity_search(input,k=50, filter={"year": {"$gte": 2019}})
for i in range(0,len(search)):
if count <= 15:
count = count + 1
sources_text = sources_text + str(count) + ". " + search[i].metadata['title'] + ' (JDLP : ' + str(search[i].metadata['year']) + '), ' + search[i].metadata['author'] + ', https://cipen.univ-gustave-eiffel.fr/fileadmin/CIPEN/OPP/' + search[i].metadata['file'] + "\n"
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". JDLP : " + search[i].metadata['jdlp'] + "</p><p style='font-size:0.8rem'>" + search[i].page_content + "</p>"
elif categorie == "skills":
search = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'competences-master-CFA.csv'}})
searchOffres = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'marche-emploi-CFA.csv'}})
for i in range(0,len(search)):
if count <= 15:
count = count + 1
sources_text = sources_text + str(count) + ". " + search[i].metadata['diplôme'] + ' (année : ' + search[i].metadata['année'] + '), ' + search[i].metadata['domaine'] + ', https://www.francecompetences.fr/recherche/rncp/' + str(search[i].metadata['rncp'])[4:] + "/\n"
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['diplôme'] + "</p><p style='font-size:0.8rem'>" + search[i].page_content + "</p>"
for i in range(0,len(searchOffres)):
if countOffres <= 15:
countOffres = countOffres + 1
sources_offres = sources_offres + str(countOffres) + ". " + searchOffres[i].metadata['Poste'] + " (type de contrat : " + searchOffres[i].metadata['Contrat'] + ")\n"
elif categorie == "videosTC":
search = vectorstore.similarity_search(input,k=50, filter={"title": {"$eq": "videos-confinement-timeline"}})
for i in range(0,len(search)):
if count <= 17:
count = count + 1
timeSeq = search[i].metadata["time"]
timeSeqRound = round(timeSeq)
time = timedelta(seconds=timeSeqRound)
sources_text = sources_text + '<div class="gridvid"><a target="_blank" title="' + search[i].metadata['titre'] + ' : ...' + search[i].page_content + '" href="' + search[i].metadata['video'] + '#start=' + str(timeSeq) + '"><img src="' + search[i].metadata['image'] + '" width="100%" alt="' + search[i].metadata['titre'] + ' : ...' + search[i].page_content + '"/><p>🕓 ' + str(time) + ' : ...' + search[i].page_content + ' : ' + search[i].metadata['titre'] + '</p></a></div>'
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['titre'] + "</p><p style='font-size:0.8rem'>🕓 "+ str(time) + " : " + search[i].page_content + "</p>"
results = [sources_text, verbatim_text, sources_offres]
return results
@cl.step(type="llm")
async def setup_conversationalChain():
model = await LLModel()
retriever = await Retriever(res.get("name"))
qa = ConversationalRetrievalChain.from_llm(
model,
memory=memory,
chain_type="stuff",
return_source_documents=True,
verbose=False,
retriever=retriever
)
cl.user_session.set("runnable", qa)
cl.user_session.set("memory", memory)
@cl.on_chat_start
async def on_chat_start():
await cl.Message(f"> REVIEWSTREAM").send()
res = await cl.AskActionMessage(
content="<div style='width:100%;text-align:center'>Sélectionnez une source documentaire</div>",
actions=[
cl.Action(name="bibliographie-OPP-DGDIN", value="Pédagogie durable", label="🔥 Pédagogie durable : exemple : «quels sont les modèles d'apprentissage dans les universités?»"),
cl.Action(name="bibliographie-OPP-DGDIN", value="Lieux d'apprentissage", label="🏫 Lieux d'apprentissage : exemple : «donne des exemples de lieu d'apprentissage dans les universités?»"),
cl.Action(name="year", value="Journée de La Pédagogie", label="👨‍🏫 Journée de La Pédagogie : exemple : «Quelles sont les bonnes pratiques des plateformes de e-learning?»"),
cl.Action(name="skills", value="Compétences du CFA Descartes", label="🧑🏻‍🎓 Les compétences des masters du CFA Descartes : exemple : «Quels sont les Master qui dispensent des compétences en marketing dispensées au CFA?»"),
cl.Action(name="OF", value="Formations Gustave Eiffel", label="🎓 Les formations de l'université Gustave Eiffel : exemple : «Quels sont les formations et les métiers possibles si on est créatif?»"),
cl.Action(name="videosTC", value="Vidéos paroles de confiné.es", label="📽️ Les vidéos paroles de confiné.es : exemple : «Quelle est la méthodologie employée avec les plateformes d'enseignement à distance?»"),
cl.Action(name="offreST", value="Offres d'emploi France Travail", label="💼 Les offres d'emploi de France Travail : exemple : «Quels sont les types de contrat proposés par les recruteurs?»"),
],
timeout="3600"
).send()
listPrompts_name = f"Liste des revues de recherche"
contentPrompts = """<p><img src='/public/hal-logo-header.png' width='32' align='absmiddle' /> <strong> Hal Archives Ouvertes</strong> : Une archive ouverte est un réservoir numérique contenant des documents issus de la recherche scientifique, généralement déposés par leurs auteurs, et permettant au grand public d'y accéder gratuitement et sans contraintes.
</p>
<p><img src='/public/logo-persee.png' width='32' align='absmiddle' /> <strong>Persée</strong> : offre un accès libre et gratuit à des collections complètes de publications scientifiques (revues, livres, actes de colloques, publications en série, sources primaires, etc.) associé à une gamme d'outils de recherche et d'exploitation.</p>
"""
prompt_elements = []
prompt_elements.append(
cl.Text(content=contentPrompts, name=listPrompts_name, display="side")
)
await cl.Message(content="📚 " + listPrompts_name, elements=prompt_elements).send()
settings = await cl.ChatSettings(
[
Select(
id="Model",
label="Publications de recherche",
values=["---", "HAL", "Persée"],
initial_index=0,
),
]
).send()
if res:
await cl.Message(f"Vous pouvez requêter sur la thématique : {res.get('value')}").send()
cl.user_session.set("selectRequest", res.get("name"))
########## Chain with streaming ##########
message_history = ChatMessageHistory()
memory = ConversationBufferMemory(memory_key="chat_history",output_key="answer",chat_memory=message_history,return_messages=True)
await setup_conversationalChain()
@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
memory = ConversationBufferMemory(return_messages=True)
root_messages = [m for m in thread["steps"] if m["parentId"] == None]
for message in root_messages:
if message["type"] == "user_message":
memory.chat_memory.add_user_message(message["output"])
else:
memory.chat_memory.add_ai_message(message["output"])
cl.user_session.set("memory", memory)
await setup_conversationalChain()
@cl.on_message
async def on_message(message: cl.Message):
memory = cl.user_session.get("memory")
runnable = cl.user_session.get("runnable") # type: Runnable
msg = cl.Message(content="")
class PostMessageHandler(BaseCallbackHandler):
"""
Callback handler for handling the retriever and LLM processes.
Used to post the sources of the retrieved documents as a Chainlit element.
"""
def __init__(self, msg: cl.Message):
BaseCallbackHandler.__init__(self)
self.msg = msg
self.sources = set() # To store unique pairs
def on_retriever_end(self, documents, *, run_id, parent_run_id, **kwargs):
for d in documents:
source_page_pair = (d.metadata['source'], d.metadata['page'])
self.sources.add(source_page_pair) # Add unique pairs to the set
def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
sources_text = "\n".join([f"{source}#page={page}" for source, page in self.sources])
self.msg.elements.append(
cl.Text(name="Sources", content=sources_text, display="inline")
)
with async_literal_client.thread() as thread:
cb = cl.AsyncLangchainCallbackHandler()
results = await runnable.acall("Contexte : Vous êtes un chercheur de l'enseignement supérieur et vous êtes doué pour faire des analyses d'articles de recherche sur les thématiques liées à la pédagogie, en fonction des critères définis ci-avant. En fonction des informations suivantes et du contexte suivant seulement et strictement, répondez en langue française strictement à la question ci-dessous à partir du contexte ci-dessous. En plus, tu créeras et tu afficheras 3 questions supplémentaires en relation avec le contexte initial, à chaque étape de la conversation. Tu écriras et tu afficheras les 3 questions supplémentaires en relation avec le contexte initial, avec un titrage de niveau 1 qui a pour titre \"Questions en relation avec le contexte : \". Lorsque cela est possible, cite les sources du contexte. Si vous ne pouvez pas répondre à la question sur la base des informations, dites que vous ne trouvez pas de réponse ou que vous ne parvenez pas à trouver de réponse. Essayez donc de comprendre en profondeur le contexte et répondez uniquement en vous basant sur les informations fournies. Ne générez pas de réponses non pertinentes. Question : " + message.content, callbacks=[cb])
answer = results["answer"]
await cl.Message(content=GoogleTranslator(source='auto', target='fr').translate(answer)).send()
#search = vectorstore.similarity_search(message.content,k=50, filter={"categorie": {"$eq": "bibliographie-OPP-DGDIN"}})
search = await Search(message.content, cl.user_session.get("selectRequest"))
#os.environ["GOOGLE_CSE_ID"] = os.getenv('GOOGLE_CSE_ID')
#os.environ["GOOGLE_API_KEY"] = os.getenv('GOOGLE_API_KEY')
#searchAPI = GoogleSearchAPIWrapper()
#def top5_results(query):
# return searchAPI.results(query, 5)
#tool = Tool(
# name="Google Search Snippets",
# description="Search Google for recent results.",
# func=top5_results,
#)
#query = str(message.content)
#ref_text = tool.run(query)
#if 'Result' not in ref_text[0].keys():
# print(ref_text)
#else:
# print('None')
sources = [
cl.Text(name="Sources", content=search[0], display="inline")
]
await cl.Message(
content="Sources : ",
elements=sources,
).send()
if search[2]:
sourcesOffres = [
cl.Text(name="Exemples d'offres d'emploi", content=search[2], display="inline")
]
await cl.Message(
content="Offres d'emploi : ",
elements=sourcesOffres,
).send()
verbatim = [
cl.Text(name="Verbatim", content=search[1], display="side")
]
await cl.Message(
content="📚 Liste des Verbatim ",
elements=verbatim,
).send()