import asyncio import gradio as gr import numpy as np import time import json import os import tempfile import requests import logging from aiohttp import ClientSession from langchain.text_splitter import RecursiveCharacterTextSplitter from datasets import Dataset, load_dataset from tqdm import tqdm from tqdm.asyncio import tqdm_asyncio HF_TOKEN = os.getenv("HF_TOKEN") SEMAPHORE_BOUND = os.getenv("SEMAPHORE_BOUND", "5") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Chunker: def __init__(self, strategy, split_seq=".", chunk_len=512): self.split_seq = split_seq self.chunk_len = chunk_len if strategy == "recursive": # https://huggingface.co/spaces/m-ric/chunk_visualizer self.split = RecursiveCharacterTextSplitter( chunk_size=chunk_len, separators=[split_seq] ).split_text if strategy == "sequence": self.split = self.seq_splitter if strategy == "constant": self.split = self.const_splitter def seq_splitter(self, text): return text.split(self.split_seq) def const_splitter(self, text): return [ text[i * self.chunk_len:(i + 1) * self.chunk_len] for i in range(int(np.ceil(len(text) / self.chunk_len))) ] def generator(input_ds, input_text_col, chunker): for i in tqdm(range(len(input_ds))): chunks = chunker.split(input_ds[i][input_text_col]) for chunk in chunks: if chunk: yield {input_text_col: chunk} async def embed_sent(sentence, embed_in_text_col, semaphore, tei_url, tmp_file): async with semaphore: payload = { "inputs": sentence, "truncate": True } async with ClientSession( headers={ "Content-Type": "application/json", "Authorization": f"Bearer {HF_TOKEN}" } ) as session: async with session.post(tei_url, json=payload) as resp: if resp.status != 200: raise RuntimeError(await resp.text()) result = await resp.json() tmp_file.write( json.dumps({"vector": result[0], embed_in_text_col: sentence}) + "\n" ) async def embed_ds(input_ds, tei_url, embed_in_text_col, temp_file): semaphore = asyncio.BoundedSemaphore(int(SEMAPHORE_BOUND)) jobs = [ asyncio.create_task(embed_sent(row[embed_in_text_col], embed_in_text_col, semaphore, tei_url, temp_file)) for row in input_ds if row[embed_in_text_col].strip() ] logger.info(f"num chunks to embed: {len(jobs)}") tic = time.time() await tqdm_asyncio.gather(*jobs) logger.info(f"embed time: {time.time() - tic}") def wake_up_endpoint(url): logger.info("Starting up TEI endpoint") n_loop = 0 while requests.get( url=url, headers={"Authorization": f"Bearer {HF_TOKEN}"} ).status_code != 200: time.sleep(2) n_loop += 1 if n_loop > 40: raise gr.Error("TEI endpoint is unavailable") logger.info("TEI endpoint is up") def chunk_embed(input_ds, input_splits, input_text_col, chunk_out_ds, strategy, split_seq, chunk_len, embed_out_ds, tei_url, private): gr.Info("Started chunking") try: input_splits = [spl.strip() for spl in input_splits.split(",") if spl] input_ds = load_dataset(input_ds, split="+".join(input_splits), token=HF_TOKEN) chunker = Chunker(strategy, split_seq, chunk_len) except Exception as e: raise gr.Error(str(e)) gen_kwargs = { "input_ds": input_ds, "input_text_col": input_text_col, "chunker": chunker } chunked_ds = Dataset.from_generator(generator, gen_kwargs=gen_kwargs) chunked_ds.push_to_hub( chunk_out_ds, private=private, token=HF_TOKEN ) gr.Info("Done chunking") logger.info("Done chunking") try: wake_up_endpoint(tei_url) with tempfile.NamedTemporaryFile(mode="a", suffix=".jsonl") as temp_file: asyncio.run(embed_ds(chunked_ds, tei_url, input_text_col, temp_file)) embedded_ds = Dataset.from_json(temp_file.name) embedded_ds.push_to_hub( embed_out_ds, private=private, token=HF_TOKEN ) except Exception as e: raise gr.Error(str(e)) gr.Info("Done embedding") logger.info("Done embedding") def change_dropdown(choice): if choice == "recursive": return [ gr.Textbox(visible=True), gr.Textbox(visible=True) ] elif choice == "sequence": return [ gr.Textbox(visible=True), gr.Textbox(visible=False) ] else: return [ gr.Textbox(visible=False), gr.Textbox(visible=True) ] with gr.Blocks() as demo: gr.Markdown( """ ## Chunk and embed """ ) input_ds = gr.Textbox(lines=1, label="Input dataset name") with gr.Row(): input_splits = gr.Textbox(lines=1, label="Input dataset splits", placeholder="train, test") input_text_col = gr.Textbox(lines=1, label="Input text column name", placeholder="text") chunk_out_ds = gr.Textbox(lines=1, label="Chunked dataset name") with gr.Row(): dropdown = gr.Dropdown( ["recursive", "sequence", "constant"], label="Chunking strategy", info="'recursive' uses a Langchain recursive tokenizer, 'sequence' splits texts by a chosen sequence, " "'constant' makes chunks of the constant size", scale=2 ) split_seq = gr.Textbox( lines=1, interactive=True, visible=False, label="Sequence", info="A text sequence to split on", placeholder="\n\n" ) chunk_len = gr.Textbox( lines=1, interactive=True, visible=False, label="Length", info="The length of chunks to split into in characters", placeholder="512" ) dropdown.change(fn=change_dropdown, inputs=dropdown, outputs=[split_seq, chunk_len]) embed_out_ds = gr.Textbox(lines=1, label="Embedded dataset name") private = gr.Checkbox(label="Make output datasets private") tei_url = gr.Textbox(lines=1, label="TEI endpoint url") with gr.Row(): clear = gr.ClearButton( components=[input_ds, input_splits, input_text_col, chunk_out_ds, dropdown, split_seq, chunk_len, embed_out_ds, tei_url, private] ) embed_btn = gr.Button("Submit") embed_btn.click( fn=chunk_embed, inputs=[input_ds, input_splits, input_text_col, chunk_out_ds, dropdown, split_seq, chunk_len, embed_out_ds, tei_url, private] ) demo.queue() demo.launch(debug=True)