import json import logging import os import tempfile import time import zipfile from io import StringIO import pandas as pd import streamlit as st from datasets import load_dataset from gretel_client import Gretel from navigator_helpers import ( InstructionResponseConfig, TrainingDataSynthesizer, StreamlitLogHandler, ) # Create a StringIO buffer to capture the logging output log_buffer = StringIO() # Create a handler to redirect logging output to the buffer handler = logging.StreamHandler(log_buffer) handler.setLevel(logging.INFO) # Set up the logger logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(handler) SAMPLE_DATASET_URL = "https://gretel-public-website.s3.us-west-2.amazonaws.com/datasets/llm-training-data/dolly-examples-qa-with-context.csv" WELCOME_MARKDOWN = """Gretel Navigator is an advanced AI system for generating high-quality, diverse synthetic data to train AI models and LLMs. It combines cutting-edge techniques from recent research with Gretel's proprietary methods to enhance your training data. ### 🌟 Key Features & Techniques - **Evolutionary Text Generation**: Inspired by WizardLM-2's diverse knowledge generation - **AI-Aligning-AI (AAA)**: Leveraging concepts from Self-Rewarding Language Models - **Quality Evaluation & Ranking**: Using Gretel's proprietary scoring methods - **Instruction-Response Generation**: Influenced by StarCoder2-Instruct's approach - **Comprehensive Training Data**: Inspired by "Textbooks Are All You Need II" ### 🚀 Use Cases 1. Create diverse training/evaluation data from seeds 2. Enhance limited datasets 3. Mitigate bias and toxicity 4. Improve model performance with domain-specific data ### 🔧 How It Works 1. Initialize with custom configuration 2. Generate and evolve text populations 3. Apply AI Align AI (AAA) for quality enhancement 4. Evaluate and output high-quality synthetic data ### 📂 Input & Output - **Input**: Seed data (text or input/output pairs) in various formats (CSV, JSON, JSONL, Hugging Face datasets) - **Output**: High-quality synthetic training examples Ready to elevate your AI training data? Let's get started with Gretel Navigator! 🚀 --- *Gretel Navigator combines techniques from recent academic research with Gretel's innovative approaches to deliver state-of-the-art synthetic data generation.* """ def main(): st.set_page_config(page_title="Gretel", layout="wide") st.title("🎨 Gretel Navigator: Create Synthetic Data from a Prompt") st.write( "Generate diverse synthetic training data from text or existing datasets to improve or evaluate AI models." ) with st.expander("Introduction", expanded=False): st.markdown(WELCOME_MARKDOWN) st.subheader("Step 1: API Key Validation") with st.expander("API Key Configuration", expanded=True): api_key = st.text_input( "Enter your Gretel API key (Get a free API key at: https://console.gretel.ai/users/me/key)", value="", type="password", help="Your Gretel API key is required to authenticate and use Gretel Navigator. If you don't have one yet, sign up for a free account at https://console.gretel.ai to get started.", ) if "gretel" not in st.session_state: st.session_state.gretel = None if "synthesized_data" not in st.session_state: st.session_state.synthesized_data = [] if st.button("Validate API Key"): if api_key: try: st.session_state.gretel = Gretel(api_key=api_key, validate=True) st.success("API key validated. Connection successful!") except Exception as e: st.error(f"Error connecting to Gretel: {str(e)}") else: st.warning("Please enter your Gretel API key to proceed.") if st.session_state.gretel is None: st.stop() st.subheader("Step 2: Data Source Selection") with st.expander("Data Source", expanded=True): data_source = st.radio( "Select data source", options=[ "Upload a file", "Select a dataset from Hugging Face", "Use a sample dataset", ], help="Choose whether to upload a file, select a dataset from Hugging Face, or use a sample dataset", ) df = None dataset_source_type = "" huggingface_dataset = "" huggingface_split = "" if data_source == "Upload a file": dataset_source_type = "uploaded" uploaded_file = st.file_uploader( "Upload a CSV, JSON, or JSONL file", type=["csv", "json", "jsonl"], help="Upload the dataset file in CSV, JSON, or JSONL format", ) if uploaded_file is not None: if uploaded_file.name.endswith(".csv"): df = pd.read_csv(uploaded_file) elif uploaded_file.name.endswith(".json"): df = pd.read_json(uploaded_file) elif uploaded_file.name.endswith(".jsonl"): df = pd.read_json(uploaded_file, lines=True) st.success(f"File uploaded successfully: {uploaded_file.name}") elif data_source == "Select a dataset from Hugging Face": dataset_source_type = "huggingface" huggingface_dataset = st.text_input( "Hugging Face Dataset Repository", help="Enter the name of the Hugging Face dataset repository (e.g., 'squad')", ) st.session_state.huggingface_dataset = huggingface_dataset huggingface_split = st.selectbox( "Dataset Split", options=["train", "validation", "test"], help="Select the dataset split to use", ) st.session_state.huggingface_split = huggingface_split if st.button("Load Hugging Face Dataset"): if huggingface_dataset: try: with st.spinner("Loading dataset from Hugging Face..."): dataset = load_dataset( huggingface_dataset, split=huggingface_split ) df = dataset.to_pandas() st.success( f"Dataset loaded from Hugging Face repository: {huggingface_dataset}" ) except Exception as e: st.error(f"Error loading dataset from Hugging Face: {str(e)}") else: st.warning("Please provide a Hugging Face dataset repository name.") elif data_source == "Use a sample dataset": dataset_source_type = "sample" st.write("Try a sample dataset to get started quickly.") if st.button("Try Sample Dataset"): try: df = pd.read_csv(SAMPLE_DATASET_URL) st.success("Sample dataset loaded successfully.") except Exception as e: st.error(f"Error downloading sample dataset: {str(e)}") if df is not None: st.session_state.df = df st.session_state.selected_fields = list(df.columns) st.write( f"Loaded dataset with {len(df)} rows and {len(df.columns)} columns." ) else: df = st.session_state.get("df") st.subheader("Step 3: Data Preview and Configuration") if df is not None: with st.expander("Data Preview", expanded=True): st.dataframe(df.head()) with st.expander("Input Fields Selection", expanded=True): st.write( "Select the context fields to provide the LLM access to for generating input/output pairs. This can include existing instructions and responses. All selected fields will be treated as ground truth data." ) selected_fields = [] for column in df.columns: if st.checkbox( column, value=column in st.session_state.get("selected_fields", []), key=f"checkbox_{column}", ): selected_fields.append(column) st.session_state.selected_fields = selected_fields with st.expander("Advanced Options", expanded=False): output_instruction_field = st.text_input( "Synthetic instruction field", value=st.session_state.get( "output_instruction_field", "synthetic_instruction" ), help="Specify the name of the output field for generated instructions", ) st.session_state.output_instruction_field = output_instruction_field output_response_field = st.text_input( "Synthetic response field", value=st.session_state.get( "output_response_field", "synthetic_response" ), help="Specify the name of the output field for generated responses", ) st.session_state.output_response_field = output_response_field num_records = st.number_input( "Max number of records from input data to process", min_value=1, max_value=len(df), value=len(df), help="Specify the number of records to process", ) st.session_state.num_records = num_records num_generations = st.number_input( "Number of generations", min_value=1, value=st.session_state.get("num_generations", 3), help="Specify the number of generations for the evolutionary algorithm", ) st.session_state.num_generations = num_generations population_size = st.number_input( "Population size", min_value=1, value=st.session_state.get("population_size", 5), help="Specify the population size for the evolutionary algorithm", ) st.session_state.population_size = population_size mutation_rate = st.slider( "Mutation rate", min_value=0.0, max_value=1.0, value=st.session_state.get("mutation_rate", 0.5), step=0.1, help="Adjust the mutation rate for the evolutionary algorithm", ) st.session_state.mutation_rate = mutation_rate temperature = st.slider( "Temperature", min_value=0.0, max_value=1.0, value=st.session_state.get("temperature", 0.7), step=0.1, help="Adjust the temperature for response generation", ) st.session_state.temperature = temperature max_tokens = st.slider( "Max tokens", min_value=1, max_value=1024, step=64, value=st.session_state.get("max_tokens", 192), help="Specify the maximum number of tokens for generated text", ) st.session_state.max_tokens = max_tokens with st.expander("Model Configuration", expanded=True): st.markdown("### Primary Navigator Models") navigator_tabular = st.selectbox( "Navigator Tabular", options=["gretelai/auto"], index=0, help="Select the primary Navigator tabular model", ) navigator_llm = st.selectbox( "Navigator LLM", options=["gretelai/gpt-auto", "gretelai/gpt-llama3-1-8b"], index=0, help="Select the primary Navigator LLM", ) st.markdown("---") st.markdown("### AI Align AI (AAA)") st.write( "AI Align AI (AAA) is a technique that iteratively improves the quality and coherence of generated outputs by using multiple LLMs for co-teaching and self-teaching. Enabling AAA will enhance the overall quality of the synthetic data, but it may slow down the generation process." ) use_aaa = st.checkbox( "Use AI Align AI (AAA)", value=st.session_state.get("use_aaa", True), help="Enable or disable the use of AI Align AI.", ) st.session_state.use_aaa = use_aaa co_teach_llms = [] if use_aaa: st.markdown("#### Navigator Co-teaching LLMs") st.write( "Select additional Navigator LLMs for co-teaching in AAA. It is recommended to use different LLMs than the primary Navigator LLM for this step." ) co_teach_options = ["gretelai/gpt-llama3-1-8b", "gretelai/gpt-mistral-nemo-2407"] for model in co_teach_options: if st.checkbox(model, value=True, key=f"checkbox_{model}"): co_teach_llms.append(model) st.session_state.co_teach_llms = co_teach_llms st.markdown("---") st.markdown("### Format Prompts") system_prompt = st.text_area( "System Prompt", value=st.session_state.get( "system_prompt", "You are an expert in generating balanced, context-rich questions and comprehensive answers based on given contexts. Your goal is to create question-answer pairs that are informative, detailed when necessary, and understandable without prior knowledge, while not revealing the answer in the question.", ), help="Specify the system prompt for the LLM", ) st.session_state.system_prompt = system_prompt instruction_format_prompt = st.text_area( "Instruction Format Prompt", value=st.session_state.get( "instruction_format_prompt", "Generate a specific and clear question directly related to a key point in the given context. The question should include enough background information to be understood without prior knowledge, while being answerable using only the information provided. Do not reveal the answer in the question. Ensure the question is focused and can be answered concisely if the information allows, but also accommodate for more detailed responses when appropriate.", ), help="Specify the format prompt for instructions", ) st.session_state.instruction_format_prompt = instruction_format_prompt instruction_mutation_prompt = st.text_area( "Instruction Mutation Prompt", value=st.session_state.get( "instruction_mutation_prompt", "Refine this question to include necessary context for understanding, without revealing the answer. Ensure it remains clear and can be comprehensively answered using only the information in the given context. Adjust the question to allow for a concise answer if possible, but also consider if a more detailed response is warranted based on the complexity of the topic.", ), help="Specify the mutation prompt for instructions", ) st.session_state.instruction_mutation_prompt = instruction_mutation_prompt instruction_quality_prompt = st.text_area( "Instruction Quality Prompt", value=st.session_state.get( "instruction_quality_prompt", "Evaluate the quality of this question based on its specificity, inclusion of necessary context, relevance to the original context, clarity for someone unfamiliar with the topic, and ability to be answered appropriately (either concisely or in detail) without revealing the answer:", ), help="Specify the quality evaluation prompt for instructions", ) st.session_state.instruction_quality_prompt = instruction_quality_prompt instruction_complexity_target = st.slider( "Instruction Complexity Target", min_value=1, max_value=5, value=st.session_state.get("instruction_complexity_target", 3), step=1, help="Specify the target complexity for instructions", ) st.session_state.instruction_complexity_target = ( instruction_complexity_target ) response_format_prompt = st.text_area( "Response Format Prompt", value=st.session_state.get( "response_format_prompt", "Generate an informative answer to the given question. Use only the information provided in the original context. The response should be as concise as possible while fully addressing the question, including relevant context and explanations where necessary. For complex topics, provide a more detailed response. Ensure the answer provides enough background information to be understood by someone unfamiliar with the topic.", ), help="Specify the format prompt for responses", ) st.session_state.response_format_prompt = response_format_prompt response_mutation_prompt = st.text_area( "Response Mutation Prompt", value=st.session_state.get( "response_mutation_prompt", "Refine this answer to balance conciseness with comprehensiveness. For straightforward questions, aim for brevity while ensuring accuracy. For complex topics, provide more detail and context. Add relevant information from the context as needed. Verify factual accuracy and correct any inaccuracies or missing key information. Ensure the answer can be understood without prior knowledge of the topic.", ), help="Specify the mutation prompt for responses", ) st.session_state.response_mutation_prompt = response_mutation_prompt response_quality_prompt = st.text_area( "Response Quality Prompt", value=st.session_state.get( "response_quality_prompt", "Evaluate the quality of this answer based on its accuracy, appropriate level of detail (concise for simple questions, comprehensive for complex ones), relevance to the question, clarity for someone unfamiliar with the topic, inclusion of necessary background information, and whether it provides a satisfactory response using only the information from the given context:", ), help="Specify the quality evaluation prompt for responses", ) st.session_state.response_quality_prompt = response_quality_prompt response_complexity_target = st.slider( "Response Complexity Target", min_value=1, max_value=5, value=st.session_state.get("response_complexity_target", 3), step=1, help="Specify the target complexity for responses", ) st.session_state.response_complexity_target = response_complexity_target with st.expander("Download SDK Code", expanded=False): st.markdown("### Ready to generate data at scale?") st.write( "Get started with your current configuration using the SDK code below:" ) config_text = f"""#!pip install -Uqq git+https://github.com/gretelai/navigator-helpers.git import logging import pandas as pd from navigator_helpers import InstructionResponseConfig, TrainingDataSynthesizer from datasets import load_dataset # Configure the logger logging.basicConfig(level=logging.INFO, format="%(message)s") API_KEY = "YOUR_API_KEY" DATASET_SOURCE = "{dataset_source_type}" HUGGINGFACE_DATASET = "{huggingface_dataset}" HUGGINGFACE_SPLIT = "{huggingface_split}" SAMPLE_DATASET_URL = "{SAMPLE_DATASET_URL}" # Load dataset if DATASET_SOURCE == 'uploaded': df = pd.read_csv("YOUR_UPLOADED_FILE_PATH") # Replace with the actual file path elif DATASET_SOURCE == 'huggingface': dataset = load_dataset(HUGGINGFACE_DATASET, split=HUGGINGFACE_SPLIT) df = dataset.to_pandas() elif DATASET_SOURCE == 'sample': df = pd.read_csv(SAMPLE_DATASET_URL) else: raise ValueError("Invalid DATASET_SOURCE specified") # Create the instruction response configuration config = InstructionResponseConfig( input_fields={st.session_state.selected_fields}, output_instruction_field="{output_instruction_field}", output_response_field="{output_response_field}", num_generations={num_generations}, population_size={population_size}, mutation_rate={mutation_rate}, temperature={temperature}, max_tokens={max_tokens}, api_key=API_KEY, navigator_tabular="{navigator_tabular}", navigator_llm="{navigator_llm}", co_teach_llms={co_teach_llms}, system_prompt='''{system_prompt}''', instruction_format_prompt='''{instruction_format_prompt}''', instruction_mutation_prompt='''{instruction_mutation_prompt}''', instruction_quality_prompt='''{instruction_quality_prompt}''', instruction_complexity_target={instruction_complexity_target}, response_format_prompt='''{response_format_prompt}''', response_mutation_prompt='''{response_mutation_prompt}''', response_quality_prompt='''{response_quality_prompt}''', response_complexity_target={response_complexity_target}, use_aaa={use_aaa} ) # Create the training data synthesizer and perform synthesis synthesizer = TrainingDataSynthesizer( df, config, output_file="results.jsonl", verbose=True, ) new_df = synthesizer.generate() """ st.code(config_text, language="python") st.download_button( label="Download SDK Code", data=config_text, file_name="data_synthesis_code.py", mime="text/plain", ) start_stop_container = st.empty() col1, col2 = st.columns(2) with col1: start_button = st.button("🚀 Start") with col2: stop_button = st.button("🛑 Stop") if "logs" not in st.session_state: st.session_state.logs = [] if "synthetic_data" not in st.session_state: st.session_state.synthetic_data = [] if start_button: # Clear the synthetic data and logs before starting a new generation st.session_state.synthetic_data = [] st.session_state.logs = [] with st.expander("Synthetic Data", expanded=True): st.subheader("Synthetic Data Generation") progress_bar = st.progress(0) tab1, tab2 = st.tabs(["Synthetic Data", "Logs"]) with tab1: synthetic_data_placeholder = st.empty() st.info( "Click on the 'Logs' tab to see and debug real-time logging for each record as it is generated by the agents." ) with tab2: log_container = st.empty() max_log_lines = 50 def custom_log_handler(msg): st.session_state.logs.append(msg) displayed_logs = st.session_state.logs[-max_log_lines:] log_text = "\n".join(displayed_logs) log_container.text(log_text) # Remove the previous log handler if it exists logger = logging.getLogger("navigator_helpers") for handler in logger.handlers: if isinstance(handler, StreamlitLogHandler): logger.removeHandler(handler) handler = StreamlitLogHandler(custom_log_handler) logger.addHandler(handler) config = InstructionResponseConfig( input_fields=selected_fields, output_instruction_field=output_instruction_field, output_response_field=output_response_field, num_generations=num_generations, population_size=population_size, mutation_rate=mutation_rate, temperature=temperature, max_tokens=max_tokens, api_key=api_key, navigator_tabular=navigator_tabular, navigator_llm=navigator_llm, co_teach_llms=co_teach_llms, system_prompt=system_prompt, instruction_format_prompt=instruction_format_prompt, instruction_mutation_prompt=instruction_mutation_prompt, instruction_quality_prompt=instruction_quality_prompt, instruction_complexity_target=instruction_complexity_target, response_format_prompt=response_format_prompt, response_mutation_prompt=response_mutation_prompt, response_quality_prompt=response_quality_prompt, response_complexity_target=response_complexity_target, use_aaa=use_aaa, ) start_time = time.time() with st.spinner("Generating synthetic data..."): for index in range(num_records): row = df.iloc[index] synthesizer = TrainingDataSynthesizer( pd.DataFrame([row]), config, output_file="results.csv", verbose=True, ) new_df = synthesizer.generate() st.session_state.synthetic_data.append(new_df) synthetic_data_placeholder.subheader("Synthetic Data") synthetic_data_placeholder.dataframe( pd.concat( st.session_state.synthetic_data, ignore_index=True ) ) progress = (index + 1) / num_records progress_bar.progress(progress) elapsed_time = time.time() - start_time records_processed = index + 1 records_remaining = num_records - records_processed est_time_per_record = ( elapsed_time / records_processed if records_processed > 0 else 0 ) est_time_remaining = est_time_per_record * records_remaining progress_text = f"Progress: {progress:.2%} | Records Processed: {records_processed} | Records Remaining: {records_remaining} | Est. Time per Record: {est_time_per_record:.2f}s | Est. Time Remaining: {est_time_remaining:.2f}s" progress_bar.text(progress_text) time.sleep(0.1) logger.removeHandler(handler) st.success("Data synthesis completed!") st.stop() if stop_button: st.warning("Synthesis stopped by the user.") # Get the complete logs from the session state complete_logs = st.session_state.logs # Convert complete logs to JSONL format log_jsonl = "\n".join([json.dumps({"log": log}) for log in complete_logs]) # Convert synthesized data to JSONL format if it exists if st.session_state.synthesized_data: synthesized_df = pd.concat( st.session_state.synthesized_data, ignore_index=True ) if not synthesized_df.empty: synthesized_data_jsonl = "\n".join( [ json.dumps(row.to_dict()) for _, row in synthesized_df.iterrows() ] ) else: synthesized_data_jsonl = None else: synthesized_data_jsonl = None # Create a temporary directory to store the files with tempfile.TemporaryDirectory() as temp_dir: # Write the complete logs to a file log_file_path = os.path.join(temp_dir, "complete_logs.jsonl") with open(log_file_path, "w") as log_file: log_file.write(log_jsonl) # Write the synthesized data to a file if it exists if synthesized_data_jsonl: synthesized_data_file_path = os.path.join( temp_dir, "synthetic_data.jsonl" ) with open(synthesized_data_file_path, "w") as synthesized_data_file: synthesized_data_file.write(synthesized_data_jsonl) # Write the SDK code to a file sdk_file_path = os.path.join(temp_dir, "data_synthesis_code.py") with open(sdk_file_path, "w") as sdk_file: sdk_file.write(config_text) # Create a ZIP file containing the logs, synthesized data, and SDK code zip_file_path = os.path.join(temp_dir, "synthesis_results.zip") with zipfile.ZipFile(zip_file_path, "w") as zip_file: zip_file.write(log_file_path, "complete_logs.jsonl") if synthesized_data_jsonl: zip_file.write( synthesized_data_file_path, "synthetic_data.jsonl" ) zip_file.write(sdk_file_path, "data_synthesis_code.py") # Download the ZIP file with open(zip_file_path, "rb") as zip_file: st.download_button( label="💾 Download Synthetic Data, Logs, and SDK Code", data=zip_file.read(), file_name="gretel_synthetic_data.zip", mime="application/zip", ) st.stop() else: st.info( "Please upload a file or select a dataset from Hugging Face to proceed." ) if __name__ == "__main__": main()