import os import torch import datasets import transformers import json from .schema import ActionInfo, EnvException, EnhancedJSONEncoder from reactagent.prompt2model.prompt_parser import MockPromptSpec, TaskType from reactagent.prompt2model.dataset_retriever import DescriptionDatasetRetriever from reactagent.prompt2model.dataset_generator import PromptBasedDatasetGenerator, DatasetSplit from reactagent.prompt2model.dataset_processor import TextualizeProcessor from reactagent.prompt2model.model_retriever import DescriptionModelRetriever from reactagent.prompt2model.model_trainer import GenerationModelTrainer from reactagent.prompt2model.model_executor import GenerationModelExecutor, ModelOutput from reactagent.prompt2model.model_evaluator import Seq2SeqEvaluator def generate_dataset(instruction, examples, save_dir, num_train, num_valid, num_test, work_dir = '.', **kwargs): try: num_train = int(num_train) num_valid = int(num_valid) num_test = int(num_test) except ValueError: raise EnvException("Number of examples should be an integer") prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples=examples) generator = PromptBasedDatasetGenerator() dataset_dict = generator.generate_dataset_dict(prompt_spec, { DatasetSplit.TRAIN: num_train, DatasetSplit.VAL: num_valid, DatasetSplit.TEST: num_test }) save_path = os.path.join(work_dir, save_dir) dataset_dict.save_to_disk(save_path) return f"Dataset successfully generated and saved to {save_path}" def retrieve_dataset(instruction, save_dir, work_dir = '.', **kwargs): prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") retriever = DescriptionDatasetRetriever() dataset_dict = retriever.retrieve_dataset_dict(prompt_spec) save_path = os.path.join(work_dir, save_dir) dataset_dict.save_to_disk(save_path) return f"Dataset successfully generated and saved to {save_path}" def retrieve_model(instruction, work_dir = '.', **kwargs): prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") retriever = DescriptionModelRetriever(use_bm25=True, use_HyDE=True) top_models = retriever.retrieve(prompt_spec) return "Top Models:\n" + "".join(f"{i+1}. {model}\n" for i, model in enumerate(top_models)) def process_dataset(instruction, load_dirs, save_dirs, work_dir = '.', **kwargs): prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") load_dirs = load_dirs.split(':') save_dirs = save_dirs.split(':') if len(load_dirs) != len(save_dirs): raise EnvException("Number of load directories should match number of save directories") load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] save_paths = [os.path.join(work_dir, save_dir) for save_dir in save_dirs] # load the datasets dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] # process the datasets processor = TextualizeProcessor(has_encoder=True) modified_dataset_dicts = processor.process_dataset_dict(prompt_spec, dataset_dicts) # save the processed datasets for dataset_dict, save_path in zip(modified_dataset_dicts, save_paths): dataset_dict.save_to_disk(save_path) return f"Data successfully processed and saved to {save_paths}" def train_model(model_name, load_dirs, result_dir, epochs, batch_size, warmup_steps, weight_decay, learning_rate, work_dir = '.', **kwargs): try: epochs = int(epochs) batch_size = int(batch_size) warmup_steps = int(warmup_steps) weight_decay = float(weight_decay) learning_rate = float(learning_rate) except ValueError: raise EnvException("Numerical parameters should be integers or floats as appropriate") load_dirs = load_dirs.split(':') result_dir = os.path.join(work_dir, result_dir) # load the datasets load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] training_datasets = [dataset_dict["train"] for dataset_dict in dataset_dicts] validation_datasets = [dataset_dict["val"] for dataset_dict in dataset_dicts] trainer = GenerationModelTrainer( model_name, has_encoder=True, executor_batch_size=batch_size, tokenizer_max_length=1024, sequence_max_length=1280, ) hparams ={ "output_dir": os.path.join(result_dir, "training_output"), "save_strategy": "epoch", "num_train_epochs": epochs, "per_device_train_batch_size": batch_size, "evaluation_strategy": "epoch", "warmup_steps": warmup_steps, "weight_decay": weight_decay, "learning_rate": learning_rate, }, trained_model, trained_tokenizer = trainer.train_model( hyperparameter_choices=hparams, training_datasets=training_datasets, validation_datasets=validation_datasets, ) trained_model.save_pretrained(os.path.join(result_dir, "trained_model")) trained_tokenizer.save_pretrained(os.path.join(result_dir, "trained_tokenizer")) return f"Model and Tokenizer successfully trained and saved respectively to {result_dir}/trained_model and {result_dir}/trained_tokenizer" def execute_model(result_dir, load_dirs, save_path, batch_size, input_column, work_dir = '.', **kwargs): load_dirs = load_dirs.split(':') result_dir = os.path.join(work_dir, result_dir) save_path = os.path.join(work_dir, save_path) try: batch_size = int(batch_size) except ValueError: raise EnvException("Batch size should be an integer") # load the datasets load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] test_dataset = datasets.concatenate_datasets(test_datasets) trained_model_path = os.path.join(result_dir, "trained_model") trained_tokenizer_path = os.path.join(result_dir, "trained_tokenizer") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") trained_model = transformers.AutoModelForSeq2SeqLM.from_pretrained(trained_model_path).to(device) trained_tokenizer = transformers.AutoTokenizer.from_pretrained(trained_tokenizer_path) executor = GenerationModelExecutor( trained_model, trained_tokenizer, batch_size, tokenizer_max_length=1024, sequence_max_length=1280, ) outputs = executor.make_prediction( test_set=test_dataset, input_column=input_column ) with open(save_path, 'w') as f: json.dump(outputs, f, cls=EnhancedJSONEncoder) return f"Model successfully executed on the test sets of the specified datasets and saved to {save_path}" def evaluate_model(load_dirs, save_path, output_column, work_dir = '.', **kwargs): load_dirs = load_dirs.split(':') # load the datasets load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] test_dataset = datasets.concatenate_datasets(test_datasets) save_path = os.path.join(work_dir, save_path) with open(save_path, 'r') as f: outputs = json.load(f) outputs = [ModelOutput(**output) for output in outputs] evaluator = Seq2SeqEvaluator() metric_values = evaluator.evaluate_model( test_dataset, gt_column=output_column, predictions=outputs, encoder_model_name="xlm-roberta-base", ) return f"Evaluation metrics: {metric_values}" P2M_ACTIONS = [ ActionInfo( name="Retrieve Model", description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModel.from_pretrained function.", usage={ "instruction": "an instruction on how to generate the output from the input", }, return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", is_primitive=False, function=retrieve_model ), ] # P2M_ACTIONS = [ # ActionInfo( # name="Generate Dataset", # description="Generate a dataset based on an instruction and examples. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", # usage={ # "instruction": "an instruction on how to generate the output from the input", # "examples": "examples of input-output pairs", # "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/generated/", # "num_train": "number of examples to generate in the training set", # "num_valid": "number of examples to generate in the validation set", # "num_test": "number of examples to generate in the test set", # }, # return_value="The observation will be a success message if the dataset was generated successfully. Otherwise, an error message will be returned.", # is_primitive=False, # function=generate_dataset # ), # ActionInfo( # name="Retrieve Dataset", # description="Retrieve a suitable dataset based on a detailed description of the requirements. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", # usage={ # "instruction": "an instruction on how to generate the output from the input", # "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/retrieved/", # }, # return_value="The observation will be a success message if the dataset was retrieved successfully. Otherwise, an error message will be returned.", # is_primitive=False, # function=retrieve_dataset # ), # ActionInfo( # name="Retrieve Model", # description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModelForSeq2SeqLM.from_pretrained function.", # usage={ # "instruction": "an instruction on how to generate the output from the input", # }, # return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", # is_primitive=False, # function=retrieve_model # ), # ActionInfo( # name="Process Dataset", # description="Process dataset based on a detailed description of the requirements. You can load the processed data later from `save_dirs` using the load_from_disk function of the HuggingFace datasets library. The input text will be in the `model_input` column and the output text will be in the `model_output` column.", # usage={ # "instruction": "an instruction on how to generate the output from the input", # "load_dirs": "directories to load the dataset dicts from, separated by colons", # "save_dirs": "directories to save the processed dataset dicts to, separated by colons. The order should match the order of the loaded datasets. We recommend saving to data/processed/", # }, # return_value="The observation will be a success message if the data was processed successfully. Otherwise, an error message will be returned.", # is_primitive=False, # function=process_dataset # ), # ActionInfo( # name="Train Model", # description="Train a Seq2Seq model from HuggingFace transformers library using the processed datasets and given hyperparameters.", # usage={ # "model_name": "name of the model to train", # "load_dirs": "directories to load the dataset dicts from, separated by colons", # "result_dir": "directory to save the trained model and tokenizer to. We recommend using results/{trial_id}/. The trained model will be available as `{result_dir}/trained_model/` and the tokenizer will be available as `{result_dir}/trained_tokenizer/`.", # "epochs": "number of epochs to train the model for", # "batch_size": "batch size for training the model", # "warmup_steps": "number of warmup steps for the optimizer", # "weight_decay": "weight decay for the optimizer", # "learning_rate": "learning rate for the optimizer", # }, # return_value="The observation will be a success message if the model was trained successfully. Otherwise, an error message will be returned.", # is_primitive=False, # function=train_model # ), # ActionInfo( # name="Execute Model on Test Set", # description="Execute a trained model on the test sets of specified dataset dicts.", # usage={ # "result_dir": "directory where the trained model and tokenizer are saved", # "load_dirs": "directories to load the dataset dicts from, separated by colons", # "save_path": "file to save the results of the model execution in json format", # "batch_size": "batch size for executing the model", # "input_column": "column name of the input text", # }, # return_value="The observation will be a success message if the model was executed successfully. Otherwise, an error message will be returned.", # is_primitive=False, # function=execute_model, # ), # ActionInfo( # name="Evaluate Model", # description="Evaluate a trained model on the test sets of specified dataset dicts.", # usage={ # "load_dirs": "directories to load the dataset dicts from, separated by colons", # "save_path": "file to load the results of the model execution in json format", # "output_column": "column name of the output text", # }, # return_value="The values for various evaluation metrics will be returned.", # is_primitive=False, # function=evaluate_model, # ) # ]