from datasets import load_dataset from tokenizers import ( decoders, models, normalizers, pre_tokenizers, processors, trainers, Tokenizer, Regex, ) from transformers import PreTrainedTokenizerFast, PreTrainedTokenizerBase from tqdm import tqdm dataset = load_dataset( "parquet", data_dir="Mxode/IndustryCorpus-Subset-zh-en", split="train") dataset = dataset.shuffle(seed=3407) ds = dataset[:1000000] ds_val = dataset[-10000:] char_len = sum(len(x) for x in ds_val['text']) def get_training_corpus(): for i in range(0, len(ds), 1000): yield ds["text"][i: i + 1000] def train(): tokenizer = Tokenizer(models.BPE()) tokenizer.normalizer = normalizers.NFC() tokenizer.pre_tokenizer = pre_tokenizers.Sequence([ pre_tokenizers.Split( pattern=Regex( "(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\\r\\n\\p{L}\\p{N}]?\\p{L}+|\\p{N}| ?[^\\s\\p{L}\\p{N}]+[\\r\\n]*|\\s*[\\r\\n]+|\\s+(?!\\S)|\\s+"), behavior="isolated", invert=False, ), pre_tokenizers.ByteLevel( add_prefix_space=False, use_regex=False, trim_offsets=False ) ]) trainer = trainers.BpeTrainer( vocab_size=16000, special_tokens=["<|endoftext|>", "<|im_start|>", "<|im_end|>"] ) tokenizer.train_from_iterator(get_training_corpus(), trainer=trainer) tokenizer.post_processor = processors.ByteLevel( add_prefix_space=False, use_regex=False, trim_offsets=False ) tokenizer.decoder = decoders.ByteLevel( add_prefix_space=False, use_regex=False, trim_offsets=False ) wrapped_tokenizer = PreTrainedTokenizerFast( tokenizer_object=tokenizer, bos_token="<|endoftext|>", eos_token="<|im_end|>", pad_token="<|endoftext|>", model_max_length=4096, clean_up_tokenization_spaces=False, errors="replace", split_special_tokens=False, ) wrapped_tokenizer.chat_template = """{% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n' }}{% endif %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}""" wrapped_tokenizer.save_pretrained( 'Mxode/Bilingual-Tokenizer/BilingualTokenizer-16K') return wrapped_tokenizer def eval(tokenizer: PreTrainedTokenizerBase): def get_compress_len(tokenizer): return sum(len(tokenizer(x, return_tensors=None)['input_ids']) for x in tqdm(ds_val['text'])) compress_len = get_compress_len(tokenizer) compression_rate = compress_len / char_len * 100 print(f'{len(tokenizer):<40} {compression_rate:.2f}%') if __name__ == "__main__": tokenizer = train() eval(tokenizer)