|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from enum import Enum, unique |
|
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, TypedDict, Union |
|
|
|
from datasets import DatasetDict, concatenate_datasets, interleave_datasets |
|
|
|
from ..extras.logging import get_logger |
|
|
|
|
|
if TYPE_CHECKING: |
|
from datasets import Dataset, IterableDataset |
|
|
|
from ..hparams import DataArguments |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
SLOTS = Sequence[Union[str, Set[str], Dict[str, str]]] |
|
|
|
|
|
@unique |
|
class Role(str, Enum): |
|
USER = "user" |
|
ASSISTANT = "assistant" |
|
SYSTEM = "system" |
|
FUNCTION = "function" |
|
OBSERVATION = "observation" |
|
|
|
|
|
class DatasetModule(TypedDict): |
|
train_dataset: Optional[Union["Dataset", "IterableDataset"]] |
|
eval_dataset: Optional[Union["Dataset", "IterableDataset"]] |
|
|
|
|
|
def merge_dataset( |
|
all_datasets: List[Union["Dataset", "IterableDataset"]], data_args: "DataArguments", seed: int |
|
) -> Union["Dataset", "IterableDataset"]: |
|
r""" |
|
Merges multiple datasets to a unified dataset. |
|
""" |
|
if len(all_datasets) == 1: |
|
return all_datasets[0] |
|
elif data_args.mix_strategy == "concat": |
|
if data_args.streaming: |
|
logger.warning("The samples between different datasets will not be mixed in streaming mode.") |
|
|
|
return concatenate_datasets(all_datasets) |
|
elif data_args.mix_strategy.startswith("interleave"): |
|
if not data_args.streaming: |
|
logger.warning("We recommend using `mix_strategy=concat` in non-streaming mode.") |
|
|
|
return interleave_datasets( |
|
datasets=all_datasets, |
|
probabilities=data_args.interleave_probs, |
|
seed=seed, |
|
stopping_strategy="first_exhausted" if data_args.mix_strategy.endswith("under") else "all_exhausted", |
|
) |
|
else: |
|
raise ValueError("Unknown mixing strategy: {}.".format(data_args.mix_strategy)) |
|
|
|
|
|
def split_dataset( |
|
dataset: Union["Dataset", "IterableDataset"], data_args: "DataArguments", seed: int |
|
) -> "DatasetDict": |
|
r""" |
|
Splits the dataset and returns a dataset dict containing train set and validation set. |
|
|
|
Supports both map dataset and iterable dataset. |
|
""" |
|
if data_args.streaming: |
|
dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=seed) |
|
val_set = dataset.take(int(data_args.val_size)) |
|
train_set = dataset.skip(int(data_args.val_size)) |
|
return DatasetDict({"train": train_set, "validation": val_set}) |
|
else: |
|
val_size = int(data_args.val_size) if data_args.val_size > 1 else data_args.val_size |
|
dataset = dataset.train_test_split(test_size=val_size, seed=seed) |
|
return DatasetDict({"train": dataset["train"], "validation": dataset["test"]}) |
|
|