import json import os import gradio as gr import requests from huggingface_hub import HfApi import traceback hf_api = HfApi() roots_datasets = { dset.id.split("/")[-1]: dset for dset in hf_api.list_datasets( author="bigscience-data", use_auth_token=os.environ.get("bigscience_data_token") ) } def get_docid_html(docid): data_org, dataset, docid = docid.split("/") metadata = roots_datasets[dataset] if metadata.private: docid_html = ( f"🔒{dataset}/{docid}' ) else: docid_html = ( f"{dataset}/{docid}' ) return docid_html PII_TAGS = {"KEY", "EMAIL", "USER", "IP_ADDRESS", "ID", "IPv4", "IPv6"} PII_PREFIX = "PI:" def process_pii(text): for tag in PII_TAGS: text = text.replace( PII_PREFIX + tag, """REDACTED {}""".format( tag ), ) return text def format_meta(result): meta_html = ( """

{}

""".format( result["meta"]["url"], result["meta"]["url"] ) if "meta" in result and result["meta"] is not None and "url" in result["meta"] else "" ) docid_html = get_docid_html(result["docid"]) return """{}

Document ID: {}

Language: {}

""".format( meta_html, docid_html, result["lang"] if lang in result else None, ) return meta_html def process_results(results, highlight_terms): if len(results) == 0: return """

No results retrieved.



""" results_html = "" for result in results: tokens = result["text"].split() tokens_html = [] for token in tokens: if token in highlight_terms: tokens_html.append("{}".format(token)) else: tokens_html.append(token) tokens_html = " ".join(tokens_html) tokens_html = process_pii(tokens_html) meta_html = format_meta(result) meta_html += """

{}


""".format( tokens_html ) results_html += meta_html return results_html + "
" def process_exact_match_payload(payload, query): datasets = set() results = payload["results"] results_html = ( "

Total nubmer of results: {}

".format( payload["num_results"] ) ) for result in results: _, dataset, _ = result["docid"].split("/") datasets.add(dataset) text = result["text"] meta_html = format_meta(result) query_start = text.find(query) query_end = query_start + len(query) tokens_html = text[0:query_start] tokens_html += "{}".format(text[query_start:query_end]) tokens_html += text[query_end:] result_html = ( meta_html + """

{}


""".format( tokens_html ) ) results_html += result_html return results_html + "
", list(datasets) def process_bm25_match_payload(payload, language): if "err" in payload: if payload["err"]["type"] == "unsupported_lang": detected_lang = payload["err"]["meta"]["detected_lang"] return f"""

Detected language {detected_lang} is not supported.
Please choose a language from the dropdown or type another query.




""" results = payload["results"] highlight_terms = payload["highlight_terms"] if language == "detect_language": return ( ( ( f"""

Detected language: {results[0]["lang"]}




""" if len(results) > 0 and language == "detect_language" else "" ) + process_results(results, highlight_terms) ), [], ) if language == "all": datasets = set() get_docid_html(result["docid"]) results_html = "" for lang, results_for_lang in results.items(): if len(results_for_lang) == 0: results_html += f"""

No results for language: {lang}


""" continue collapsible_results = f"""
Results for language: {lang}
{process_results(results_for_lang, highlight_terms)}
""" results_html += collapsible_results for r in results_for_lang: _, dataset, _ = r["docid"].split("/") datasets.add(dataset) return results_html, list(datasets) datasets = set() for r in results: _, dataset, _ = r["docid"].split("/") datasets.add(dataset) return process_results(results, highlight_terms), list(datasets) def scisearch(query, language, num_results=10): datasets = [] try: query = query.strip() exact_search = False if query.startswith('"') and query.endswith('"') and len(query) >= 2: exact_search = True query = query[1:-1] else: query = " ".join(query.split()) if query == "" or query is None: return "" post_data = {"query": query, "k": num_results} if language != "detect_language": post_data["lang"] = language address = ( "http://34.105.160.81:8080" if exact_search else os.environ.get("address") ) output = requests.post( address, headers={"Content-type": "application/json"}, data=json.dumps(post_data), timeout=60, ) payload = json.loads(output.text) return ( process_bm25_match_payload(payload, language) if not exact_search else process_exact_match_payload(payload, query) ) except Exception as e: results_html = f"""

Raised {type(e).__name__}

Check if a relevant discussion already exists in the Community tab. If not, please open a discussion.

""" print(e) print(traceback.format_exc()) return results_html, datasets def flag(query, language, num_results, issue_description): try: post_data = { "query": query, "k": num_results, "flag": True, "description": issue_description, } if language != "detect_language": post_data["lang"] = language output = requests.post( os.environ.get("address"), headers={"Content-type": "application/json"}, data=json.dumps(post_data), timeout=120, ) results = json.loads(output.text) except: print("Error flagging") return "" description = """#

🌸 🔎 ROOTS search tool 🔍 🌸

The ROOTS corpus was developed during the [BigScience workshop](https://bigscience.huggingface.co/) for the purpose of training the Multilingual Large Language Model [BLOOM](https://huggingface.co/bigscience/bloom). This tool allows you to search through the ROOTS corpus. We serve a BM25 index for each language or group of languages included in ROOTS. You can read more about the details of the tool design [here](https://huggingface.co/spaces/bigscience-data/scisearch/blob/main/roots_search_tool_specs.pdf). For more information and instructions on how to access the full corpus check [this form](https://forms.gle/qyYswbEL5kA23Wu99).""" if __name__ == "__main__": demo = gr.Blocks( css=".underline-on-hover:hover { text-decoration: underline; } .flagging { font-size:12px; color:Silver; }" ) with demo: with gr.Row(): gr.Markdown(value=description) with gr.Row(): query = gr.Textbox( lines=1, max_lines=1, placeholder="Put your query in double quotes for exact search.", label="Query", ) with gr.Row(): lang = gr.Dropdown( choices=[ "ar", "ca", "code", "en", "es", "eu", "fr", "id", "indic", "nigercongo", "pt", "vi", "zh", "detect_language", "all", ], value="en", label="Language", ) with gr.Row(): k = gr.Slider(1, 100, value=10, step=1, label="Max Results") with gr.Row(): """ with gr.Column(scale=1): exact_search = gr.Checkbox( value=False, label="Exact Search", variant="compact" ) """ with gr.Column(scale=4): submit_btn = gr.Button("Submit") with gr.Row(visible=False) as datasets_filter: available_datasets = gr.Dropdown( type="value", choices=[], value=[], label="Datasets", multiselect=True, ) with gr.Row(): results = gr.HTML(label="Results") with gr.Column(visible=False) as flagging_form: flag_txt = gr.Textbox( lines=1, placeholder="Type here...", label="""If you choose to flag your search, we will save the query, language and the number of results you requested. Please consider adding relevant additional context below:""", ) flag_btn = gr.Button("Flag Results") flag_btn.click(flag, inputs=[query, lang, k, flag_txt], outputs=[flag_txt]) def submit(query, lang, k, dropdown_input): print("submitting", query, lang, k) query = query.strip() if query is None or query == "": return "", "" results_html, datasets = scisearch(query, lang, k) print(datasets) return { results: results_html, flagging_form: gr.update(visible=True), datasets_filter: gr.update(visible=True), available_datasets: gr.Dropdown.update(choices=datasets, value=datasets), } def filter_datasets(): pass query.submit( fn=submit, inputs=[query, lang, k, available_datasets], outputs=[results, flagging_form, datasets_filter, available_datasets], ) submit_btn.click( submit, inputs=[query, lang, k, available_datasets], outputs=[results, flagging_form, datasets_filter, available_datasets], ) available_datasets.change(filter_datasets, inputs=[], outputs=[]) demo.launch(enable_queue=True, debug=True)