import os import time import random import pandas as pd import streamlit as st import datetime import uuid from huggingface_hub import HfApi, login, CommitScheduler from datasets import load_dataset import openai from openai import OpenAI # File Path DATA_PATH = "Dr-En-space-test.csv" DATA_REPO = "M-A-D/dar-en-space-test" api = hf.HfApi() access_token_write = "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" login(token=access_token_write) repo_id = "M-A-D/dar-en-space-test" st.set_page_config(layout="wide") # Initialize the ParquetScheduler class ParquetScheduler(CommitScheduler): """ Usage: configure the scheduler with a repo id. Once started, you can add data to be uploaded to the Hub. 1 `.append` call will result in 1 row in your final dataset. ```py # Start scheduler >>> scheduler = ParquetScheduler(repo_id="my-parquet-dataset") # Append some data to be uploaded >>> scheduler.append({...}) >>> scheduler.append({...}) >>> scheduler.append({...}) ``` The scheduler will automatically infer the schema from the data it pushes. Optionally, you can manually set the schema yourself: ```py >>> scheduler = ParquetScheduler( ... repo_id="my-parquet-dataset", ... schema={ ... "prompt": {"_type": "Value", "dtype": "string"}, ... "negative_prompt": {"_type": "Value", "dtype": "string"}, ... "guidance_scale": {"_type": "Value", "dtype": "int64"}, ... "image": {"_type": "Image"}, ... }, ... ) See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value for the list of possible values. """ def __init__( self, *, repo_id: str, schema: Optional[Dict[str, Dict[str, str]]] = None, every: Union[int, float] = 5, path_in_repo: Optional[str] = "data", repo_type: Optional[str] = "dataset", revision: Optional[str] = None, private: bool = False, token: Optional[str] = None, allow_patterns: Union[List[str], str, None] = None, ignore_patterns: Union[List[str], str, None] = None, hf_api: Optional[HfApi] = None, ) -> None: super().__init__( repo_id=repo_id, folder_path="dummy", # not used by the scheduler every=every, path_in_repo=path_in_repo, repo_type=repo_type, revision=revision, private=private, token=token, allow_patterns=allow_patterns, ignore_patterns=ignore_patterns, hf_api=hf_api, ) self._rows: List[Dict[str, Any]] = [] self._schema = schema def append(self, row: Dict[str, Any]) -> None: """Add a new item to be uploaded.""" with self.lock: self._rows.append(row) def push_to_hub(self): # Check for new rows to push with self.lock: rows = self._rows self._rows = [] if not rows: return print(f"Got {len(rows)} item(s) to commit.") # Load images + create 'features' config for datasets library schema: Dict[str, Dict] = self._schema or {} path_to_cleanup: List[Path] = [] for row in rows: for key, value in row.items(): # Infer schema (for `datasets` library) if key not in schema: schema[key] = _infer_schema(key, value) # Load binary files if necessary if schema[key]["_type"] in ("Image", "Audio"): # It's an image or audio: we load the bytes and remember to cleanup the file file_path = Path(value) if file_path.is_file(): row[key] = { "path": file_path.name, "bytes": file_path.read_bytes(), } path_to_cleanup.append(file_path) # Complete rows if needed for row in rows: for feature in schema: if feature not in row: row[feature] = None # Export items to Arrow format table = pa.Table.from_pylist(rows) # Add metadata (used by datasets library) table = table.replace_schema_metadata( {"huggingface": json.dumps({"info": {"features": schema}})} ) # Write to parquet file archive_file = tempfile.NamedTemporaryFile() pq.write_table(table, archive_file.name) # Upload self.api.upload_file( repo_id=self.repo_id, repo_type=self.repo_type, revision=self.revision, path_in_repo=f"{uuid.uuid4()}.parquet", path_or_fileobj=archive_file.name, ) print(f"Commit completed.") # Cleanup archive_file.close() for path in path_to_cleanup: path.unlink(missing_ok=True) # Define the ParquetScheduler instance with your repo details scheduler = ParquetScheduler(repo_id=repo_id) # Function to append new translation data to the ParquetScheduler def append_translation_data(original, translation, translated, corrected=False): data = { "original": original, "translation": translation, "translated": translated, "corrected": corrected, "timestamp": datetime.datetime.utcnow().isoformat(), "id": str(uuid.uuid4()) # Unique identifier for each translation } scheduler.append(data) # Load data def load_data(): return pd.DataFrame(load_dataset(DATA_REPO,download_mode="force_redownload",split='test')) #def save_data(data): # data.to_csv(DATA_PATH, index=False) # # to_save = datasets.Dataset.from_pandas(data) # api.upload_file( # path_or_fileobj="./Dr-En-space-test.csv", # path_in_repo="Dr-En-space-test.csv", # repo_id=DATA_REPO, # repo_type="dataset", #) # # to_save.push_to_hub(DATA_REPO) def skip_correction(): noncorrected_sentences = st.session_state.data[(st.session_state.data.translated == True) & (st.session_state.data.corrected == False)]['sentence'].tolist() if noncorrected_sentences: st.session_state.orig_sentence = random.choice(noncorrected_sentences) st.session_state.orig_translation = st.session_state.data[st.session_state.data.sentence == st.session_state.orig_sentence]['translation'] else: st.session_state.orig_sentence = "No more sentences to be corrected" st.session_state.orig_translation = "No more sentences to be corrected" st.title("Darija Translation Corpus Collection") if "data" not in st.session_state: st.session_state.data = load_data() if "sentence" not in st.session_state: untranslated_sentences = st.session_state.data[st.session_state.data['translated'] == False]['sentence'].tolist() if untranslated_sentences: st.session_state.sentence = random.choice(untranslated_sentences) else: st.session_state.sentence = "No more sentences to translate" if "orig_translation" not in st.session_state: noncorrected_sentences = st.session_state.data[(st.session_state.data.translated == True) & (st.session_state.data.corrected == False)]['sentence'].tolist() noncorrected_translations = st.session_state.data[(st.session_state.data.translated == True) & (st.session_state.data.corrected == False)]['translation'].tolist() if noncorrected_sentences: st.session_state.orig_sentence = random.choice(noncorrected_sentences) st.session_state.orig_translation = st.session_state.data.loc[st.session_state.data.sentence == st.session_state.orig_sentence]['translation'].values[0] else: st.session_state.orig_sentence = "No more sentences to be corrected" st.session_state.orig_translation = "No more sentences to be corrected" if "user_translation" not in st.session_state: st.session_state.user_translation = "" with st.sidebar: st.subheader("About") st.markdown("""This is app is designed to collect Darija translation corpus.""") tab1, tab2, tab3 = st.tabs(["Translation", "Correction", "Auto-Translate"]) with tab1: with st.container(): st.subheader("Original Text:") st.write('
{}
'.format(st.session_state.sentence), unsafe_allow_html=True) st.subheader("Translation:") st.session_state.user_translation = st.text_area("Enter your translation here:", value=st.session_state.user_translation) if st.button("💾 Save"): if st.session_state.user_translation: # Append data to be saved append_translation_data( original=st.session_state.sentence, translation=st.session_state.user_translation, translated=True ) st.session_state.user_translation = "" # st.toast("Saved!", icon="👏") st.success("Saved!") # Update the sentence for the next iteration. untranslated_sentences = st.session_state.data[st.session_state.data['translated'] == False]['sentence'].tolist() if untranslated_sentences: st.session_state.sentence = random.choice(untranslated_sentences) else: st.session_state.sentence = "No more sentences to translate" time.sleep(0.5) # Rerun the app st.rerun() with tab2: with st.container(): st.subheader("Original Darija Text:") st.write('
{}
'.format(st.session_state.orig_sentence), unsafe_allow_html=True) with st.container(): st.subheader("Original English Translation:") st.write('
{}
'.format(st.session_state.orig_translation), unsafe_allow_html=True) st.subheader("Corrected Darija Translation:") corrected_translation = st.text_area("Enter the corrected Darija translation here:") if st.button("💾 Save Translation"): if corrected_translation: # Append data to be saved append_translation_data( original=st.session_state.orig_sentence, translation=corrected_translation, translated=True, corrected=True ) st.success("Saved!") # Update the sentence for the next iteration. noncorrected_sentences = st.session_state.data[(st.session_state.data.translated == True) & (st.session_state.data.corrected == False)]['sentence'].tolist() # noncorrected_sentences = st.session_state.data[st.session_state.data['corrected'] == False]['sentence'].tolist() if noncorrected_sentences: st.session_state.orig_sentence = random.choice(noncorrected_sentences) st.session_state.orig_translation = st.session_state.data[st.session_state.data.sentence == st.session_state.orig_sentence]['translation'] else: st.session_state.orig_translation = "No more sentences to be corrected" corrected_translation = "" # Reset the input value after saving st.button("⏩ Skip to the Next Pair", key="skip_button", on_click=skip_correction) with tab3: st.subheader("Auto-Translate") # User input for OpenAI API key openai_api_key = st.text_input("Paste your OpenAI API key:") # Slider for the user to choose the number of samples to translate num_samples = st.slider("Select the number of samples to translate", min_value=1, max_value=100, value=10) # Estimated cost display cost = num_samples * 0.0012 st.write(f"The estimated cost for translating {num_samples} samples is: ${cost:.4f}") if st.button("Do the MAGIC with Auto-Translate ✨"): if openai_api_key: openai.api_key = openai_api_key client = OpenAI( # defaults to os.environ.get("OPENAI_API_KEY") api_key=openai_api_key, ) # Get 10 samples from the dataset for translation samples_to_translate = st.session_state.data.sample(10)['sentence'].tolist() # System prompt for translation assistant translation_prompt = """ You are a helpful AI-powered translation assistant designed for users seeking reliable translation assistance. Your primary function is to provide context-aware translations from Moroccan Arabic (Darija) to English. """ auto_translations = [] for sentence in samples_to_translate: # Create messages for the chat model messages = [ {"role": "system", "content": translation_prompt}, {"role": "user", "content": f"Translate the following sentence to English: '{sentence}'"} ] # Perform automatic translation using OpenAI GPT-3.5-turbo model response = client.chat.completions.create( # model="gpt-3.5-turbo", model="gpt-4-1106-preview", # api_key=openai_api_key, messages=messages ) # Extract the translated text from the response translated_text = response.choices[0].message['content'].strip() # Append the translated text to the list auto_translations.append(translated_text) # Update the dataset with auto-translations st.session_state.data.loc[ st.session_state.data['sentence'].isin(samples_to_translate), 'translation' ] = auto_translations # Append data to be saved append_translation_data( original=st.session_state.orig_sentence, translation=corrected_translation, translated=True, corrected=True ) st.success("Auto-Translations saved!") else: st.warning("Please paste your OpenAI API key.") # Start the ParquetScheduler if __name__ == "__main__": scheduler.start()