Finetuning Falcon 7b in a hybrid distributed fashion
(Ain't AI a zombie that will eat our brains !!)
Fine-Tuning of Large Language Models (LLMs)
Fine-tuning large language models (LLMs) is a critical step in adapting pre-trained models to specialized tasks or domains. Pre-trained LLMs, such as Falcon 3, are trained on vast datasets and possess general knowledge about language and context. However, fine-tuning enables these models to excel in specific tasks like summarization, sentiment analysis, or domain-specific question answering.
The fine-tuning process involves updating the model weights using a smaller, task-specific dataset while leveraging the pre-trained knowledge. Fine-tuning typically requires fewer computational resources compared to training the model from scratch and focuses on improving task performance while minimizing the risk of overfitting. Techniques such as supervised fine-tuning, reinforcement learning from human feedback (RLHF), or instruction-tuning are commonly used.
Fine-tuning large models poses challenges such as high memory and computational requirements, making distributed training techniques essential to optimize efficiency.
Distributed Model Fine-Tuning
As LLMs grow in size, fine-tuning them on a single machine becomes infeasible due to memory constraints. Distributed model fine-tuning addresses these challenges by spreading computations across multiple devices, such as GPUs or TPUs. This approach not only reduces the memory burden per device but also accelerates training by parallelizing operations.
Key Approaches in Distributed Fine-Tuning:
- Data Parallelism: Each device processes a different subset of the data while maintaining a replica of the entire model. Gradients computed on each device are synchronized and averaged after each forward and backward pass.
- Model Parallelism: The model itself is split across multiple devices. For instance, different layers or parts of a single layer can reside on different GPUs, enabling the training of extremely large models.
- Pipeline Parallelism: Combines aspects of model parallelism and data parallelism by partitioning the model into segments and processing mini-batches sequentially through each segment.
- Zero Redundancy Optimizer (ZeRO): DeepSpeed’s ZeRO is a popular optimization framework for distributed fine-tuning that partitions optimizer states, gradients, and model weights across devices, significantly reducing memory requirements.
Each of these approaches has its advantages and trade-offs, and the choice depends on model size, hardware availability, and desired efficiency.
Distributed Tensor Fine-Tuning
Distributed tensor fine-tuning is an advanced approach to optimize the memory and compute efficiency when fine-tuning LLMs. Instead of handling models as monolithic entities, this method partitions and operates on the tensors (model parameters) themselves across devices.
Core Concepts in Distributed Tensor Fine-Tuning:
- Sharded Tensor Management: Tensors, such as weights and gradients, are divided into smaller shards distributed across devices. This minimizes memory usage on individual devices.
- Tensor Parallelism: Operations on large tensors, such as matrix multiplications, are split and executed across multiple GPUs. For example, in multi-head attention mechanisms, computations for different attention heads can be distributed.
- Memory Offloading: Techniques like DeepSpeed ZeRO stage 3 offload optimizer states and activations to CPU memory or NVMe storage, enabling the fine-tuning of models far larger than the total GPU memory.
- Gradient Accumulation with Partitioning: Gradients are computed in partitions and accumulated in a memory-efficient manner before updating model parameters.
Distributed tensor fine-tuning is particularly useful when working with extremely large models, such as 7B, 13B, or 175B parameter models. By enabling fine-grained partitioning and execution, it makes large-scale model fine-tuning practical without sacrificing performance.
This method is also highly compatible with mixed-precision training (e.g., FP16 or BF16) and sparsity techniques, further improving efficiency on modern hardware like NVIDIA A100 GPUs.
Enough blah balh let's go to actions and get our hands dirty !!!
DeepSpeed hybrid parallelism
Combining Tensor and Pipeline Parallelism DeepSpeed allows hybrid parallelism, which combines tensor and pipeline parallelism. This is particularly useful for extremely large models:
- Tensor parallelism splits computation within a single layer across GPUs.
- Pipeline parallelism splits the model across layers into stages.
By combining these methods, DeepSpeed can train models that are orders of magnitude larger than what a single GPU can handle.
Imagine your model has 24 layers (Transformer blocks), and each layer is massive (too big for a single GPU). Hybrid parallelism is employed to:
- Split the operations within each layer (Tensor Parallelism).
- Divide the model across GPUs (Pipeline Parallelism). If we assume that we have 4 GPUs Pipeline Parallelism: Divide the model into 2 stages (each stage spans 12 layers).
- Stage 1: Layers 1–12 (on GPUs 0 and 1).
- Stage 2: Layers 13–24 (on GPUs 2 and 3).
Tensor Parallelism: Within each stage, the layers are split across 2 GPUs.
- Stage 1: Layers 1–12 are tensor-parallel across GPUs 0 and 1.
- Stage 2: Layers 13–24 are tensor-parallel across GPUs 2 and 3.
Installing dependencies and preparing the dataset
In this tutorial we will train/finetune our model in a distributed tensors fashion using deepspeed framework we will use four A100 GPUs with 80Gb of ram each.
Dependencies
The following dependencies are required to train the model and prepare the dataset:
pip install transformers datasets pandas accelerate tqdm torch deepspeed tqdm wandb dataclasses
Some of the dependencies will be used to process the dataset, while others for the finetuning/training purposes.
Dataset and arguments preparation
For this example I will use a dataset that i shared comprising of 400k samples of ghidra decompiled code. In this tutorial we will finetune The Falcon3 model to recreate the original program from a decompiled instance.This dataset contains pairs of original source code programs and their corresponding decompiled functions obtained using Ghidra, a powerful reverse engineering tool developed by the NSA. It is designed to facilitate research in areas such as program analysis, reverse engineering, decompilation improvement, and machine learning applications for source code and binary translation.
let's start by importing some dependencies:
import copy
import random
from dataclasses import dataclass, field
from typing import Optional, Dict, Sequence
import torch
import torch.distributed
import transformers
from transformers import Trainer
from datasets import load_dataset
import wandb
import deepspeed
Let's start by making our life easier by defining some dataclasses that take the required arguments for the dataset the trainer and the model
@dataclass
class ModelArguments:
model_name_or_path: Optional[str] = field(
default="tiiuae/Falcon3-7B-Base"
)
use_flash_attention: bool = field(
default=False, metadata={"help": "Whether to use flash attention."}
)
@dataclass
class DataArguments:
data_path: str = field(
default=None, metadata={"help": "Path to the training data."}
)
@dataclass
class TrainingArguments(transformers.TrainingArguments):
cache_dir: Optional[str] = field(default=None)
optim: str = field(default="adamw_torch")
model_max_length: int = field(
default=512,
metadata={
"help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."
},
)
The dataset can be loaded using the following snippet of code (but let's ignore this step for the moment because we will build more flexible approach to load the dataset):
data_path = Neo111x/decompile_LLM
raw_train_datasets = load_dataset(data_path, split="train")
The dataset looks like the following:
instruction | output |
---|---|
void ioabs_tcp_pre_select(int *param_1,int *param_2,long param_3) { *param_1 = *param_2; *param_2 = *param_2 + 1; *(int *)((long)*param_1 * 8 + param_3 + 4) = param_1[4]; *(uint *)(param_3 + (long)*param_1 * 8) = *(uint *)(param_3 + (long)*param_1 * 8) | 1; if (((**(int **)(param_1 + 2) + *(int )((long *)(param_1 + 2) + 4)) - *(int )((long *)(param_1 + 2) + 8)) % *(int )((long *)(param_1 + 2) + 4) != 0) { *(uint *)(param_3 + (long)*param_1 * 8) = *(uint *)(param_3 + (long)*param_1 * 8) |
ulong ioabs_tcp_pre_select(int *param_1,int *param_2,long param_3) { uint *puVar1; int iVar2; int *piVar3; int iVar4; ulong uVar5; iVar2 = *param_2; *param_1 = iVar2; *param_2 = iVar2 + 1; *(int *)(param_3 + 4 + (long)*param_1 * 8) = param_1[4]; puVar1 = (uint *)(param_3 + (long)*param_1 * 8); *puVar1 = *puVar1 | 1; piVar3 = *(int **)(param_1 + 2); iVar2 = piVar3[1]; iVar4 = (iVar2 + *piVar3) - piVar3[2]; uVar5 = (long)iVar4 / (long)iVar2 & 0xffffffff; if (iVar4 % iVar2 != 0) { uVar5 = (ulong)*param_1; puVar1 = (uint *)(param_3 + uVar5 * 8); *puVar1 = *puVar1 |
......................... | ................................ |
Where the instruction is supposed to be the input for the model and the output is the expected output from the model. Next we need to build prompts from the instructions provided by the dataset, we can define the following function for this purpose:
def build_instruction_prompt(instruction: str):
return """# This is the decompiled code:
{}
# What is the source code?
""".format(
instruction
)
Next we will define our toneization function, this function is supposed to take the instructions and tokenize them for the model's input.
def _tokenize_fn(
strings: Sequence[str], tokenizer: transformers.PreTrainedTokenizer
) -> Dict:
"""Tokenize a list of strings."""
tokenized_list = [
tokenizer(
text,
return_tensors="pt",
padding="longest",
max_length=tokenizer.model_max_length,
truncation=True,
)
for text in strings
]
input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list]
input_ids_lens = labels_lens = [
tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item()
for tokenized in tokenized_list
]
return dict(
input_ids=input_ids,
labels=labels,
input_ids_lens=input_ids_lens,
labels_lens=labels_lens,
)
Let's go more fancy and define a preprocessing function that uses the tokenization function to preprocess our data
IGNORE_INDEX = -100
def preprocess(
sources: Sequence[str],
targets: Sequence[str],
tokenizer: transformers.PreTrainedTokenizer,
) -> Dict:
"""Preprocess the data by tokenizing."""
examples = [s + t for s, t in zip(sources, targets)]
examples_tokenized, sources_tokenized = [
_tokenize_fn(strings, tokenizer) for strings in (examples, sources)
]
input_ids = examples_tokenized["input_ids"]
labels = copy.deepcopy(input_ids)
for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]):
label[:source_len] = IGNORE_INDEX
return dict(input_ids=input_ids, labels=labels)
Now on this next step we will go more fancy by defining a dat collator class, this class can be passed to our trainer object and makes all the process of data preprocessing and tokenization easier.
@dataclass
class DataCollatorForSupervisedDataset(object):
"""Collate examples for supervised fine-tuning."""
tokenizer: transformers.PreTrainedTokenizer
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
input_ids, labels = tuple(
[instance[key] for instance in instances] for key in ("input_ids", "labels")
)
input_ids = [torch.tensor(x) for x in input_ids]
input_ids = torch.nn.utils.rnn.pad_sequence(
input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
)
labels = [torch.tensor(x) for x in labels]
labels = torch.nn.utils.rnn.pad_sequence(
labels, batch_first=True, padding_value=IGNORE_INDEX
)
return dict(
input_ids=input_ids,
labels=labels,
attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
)
Now to make things even more manageable we will define a tokenization function to wrap both the process of building the prompt and the process of tokenizations. The function will call the build prompt instruction to build the appropriate prompts then tonenizes the function call result.
def train_tokenize_function(examples, tokenizer):
sources = [
build_instruction_prompt(instruction) for instruction in examples["instruction"]
]
eos_token = tokenizer.eos_token
targets = [f"{output}\n{eos_token}" for output in examples["output"]]
data_dict = preprocess(sources, targets, tokenizer)
return data_dict
So far we are done with the data preparation and the preprocessing definition, we defined all the required functions to build our training dataset and format it to the appropriate model's input.
Preparing the model for training
At this point we are done with the required steps to tokenize, preprocess, and prepare the dataset. Now we can focus on implementing the required code to train/finetune our model. At some point we defined some dataclasses that take some arguments because when training a model using tensor paralelle mode and pipelines we need to experiment a bit with our batch sizes to fit a large model and a large dataset into our hardware. Usually when using deespeed it is better to pass those parameters as arguments to our training script we can use the following code to parse the differerent arguments and attribute them to the appropriate argument class (data, trainer, model):
def train():
parser = transformers.HfArgumentParser(
(ModelArguments, DataArguments, TrainingArguments)
)
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
You can find more about the HfArgumentParser on hugging face documentation doc
Let's follow our train function definition by loading our model tokenizer that have been defined by the model argument:
tokenizer = transformers.AutoTokenizer.from_pretrained(
model_args.model_name_or_path,
model_max_length=training_args.model_max_length,
padding_side="right",
use_fast=True,
trust_remote_code=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
print("PAD Token:", tokenizer.pad_token, tokenizer.pad_token_id)
print("BOS Token", tokenizer.bos_token, tokenizer.bos_token_id)
print("EOS Token", tokenizer.eos_token, tokenizer.eos_token_id)
Now let's load our model and activate the flash attention if it is compatible with our model:
model_kwargs = {}
if model_args.use_flash_attention:
model_kwargs["attn_implementation"] = "flash_attention_2"
model = transformers.AutoModelForCausalLM.from_pretrained(
model_args.model_name_or_path, torch_dtype=torch.bfloat16, trust_remote_code=True, **model_kwargs
)
The arguments will be parsed from the data classes that have been defined later from the user defined arguments when the script is called through deepspeed. You can find more about the. flash attention here
Now let's use the preprocessing functions that we defind up in this article and load our dataset:
raw_train_datasets = load_dataset(data_args.data_path, split="train")
#"json",
#data_files=data_args.data_path,
#split="train",
#cache_dir=training_args.cache_dir,
#)
if training_args.local_rank > 0:
torch.distributed.barrier()
train_dataset = raw_train_datasets.map(
train_tokenize_function,
batched=True,
batch_size=64,
num_proc=8,
remove_columns=raw_train_datasets.column_names,
load_from_cache_file=True, # not args.overwrite_cache
desc="Running Encoding",
fn_kwargs={"tokenizer": tokenizer},
)
if training_args.local_rank == 0:
torch.distributed.barrier()
if training_args.local_rank == 0:
print("Training dataset samples:", len(train_dataset))
for index in random.sample(range(len(train_dataset)), 3):
print(
f"Sample {index} of the training set: {train_dataset[index]['input_ids']}, {train_dataset[index]['labels']}."
)
print(
f"Sample {index} of the training set: {tokenizer.decode(list(train_dataset[index]['input_ids']))}."
)
data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)
data_module = dict(
train_dataset=train_dataset, eval_dataset=None, data_collator=data_collator
)
Here we are using the local_rank arguments that are automatically passed to the script when using deepspeed to enable the distribution of the data in our GPUs. We are also using some prints to help understand what is happening.
At this step, we are ready to instantiate our trainer instance using all the objects and functions that we defined for the dataset and the model:
trainer = Trainer(
model=model, tokenizer=tokenizer, args=training_args, **data_module
)
trainer.train()
trainer.save_model()
trainer.save_state()
Hang on we are almost there ! We can really start our training soon !
DeepSpeed configuration
Now the last step is to define our deepspeed configuration. Lets start by defining first how to determine our batch size
Global Batch Size=Per Device Batch Size×Number of GPUs×Gradient Accumulation Steps
So assuming we have 4 gpus and 4 batch of data per device and a gradient accumulation of 4 the batch size will be 64.
Let's also use the ZeRO optimization framework in DeepSpeed, which is designed to reduce memory consumption and make training of large models feasible on GPUs with limited memory. It achieves this by partitioning and offloading certain components of the model and optimizer across devices.
Let's also use the stage 2 of the zero optimization, which has the following characteristics:
- Partitioned optimizer states: The optimizer states (e.g., momentum, variance for Adam) are split across GPUs.
- Partitioned gradients: Each GPU only keeps a portion of the gradients, instead of storing the full gradients.
Normally, the model parameters (weights) reside in GPU memory during training, but offloading reduces the GPU memory requirement. In the configuration, we are specifying that the CPU memory can be used for offloading.
Let's also enable the offloading of optimizer states (e.g., momentum and variance in Adam) to the CPU. These states are often large and consume significant memory. Offloading them to cpu will help avoiding going out of memory with such large model. Enabling pinned (page-locked) memory on the CPU, allows faster and more efficient data transfer between the CPU and GPU.
That's about it the rest is self explanatory, we are enabling the model parallelism through pipelining into 4 stages and tensor parellelism with a world size equal to the number of available GPUs.
{
"train_batch_size": 64,
"gradient_accumulation_steps": 4,
"zero_optimization": {
"stage": 2,
"offload_param": {
"device": "cpu",
"pin_memory": true
},
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
}
},
"bf16": {
"enabled": true,
"loss_scale": 0,
"initial_scale_power": 32
},
"deepspeed_transformer_kernel": true,
"model_parallel": {
"enabled": true,
"pipeline": {
"enabled": true,
"num_stages": 4
},
"tensor_parallel": {
"enabled": true,
"world_size": 4
}
}
}
We have to save this configuration in a json file let's call it ds.json. More could be found about this on deepspeed doccumentation here. Now we are ready to train the model, but let's make it even more fancy and define a bash script to pass the arguments for our script:
WORKSPACE="/data"
DATA_PATH="Neo111x/decompile_LLM"
OUTPUT_PATH="${WORKSPACE}/output_models/falCodecompile-large-7b"
MODEL_PATH="tiiuae/Falcon3-7B-Base"
deepspeed --num_gpus=4 finetune.py \
--model_name_or_path $MODEL_PATH \
--data_path $DATA_PATH \
--output_dir $OUTPUT_PATH \
--num_train_epochs 2 \
--model_max_length 1024 \
--per_device_train_batch_size 4 \
--gradient_accumulation_steps 4 \
--evaluation_strategy "no" \
--save_strategy "steps" \
--use_flash_attention \
--save_steps 500 \
--save_total_limit 100 \
--learning_rate 2e-5 \
--max_grad_norm 1.0 \
--weight_decay 0.1 \
--warmup_ratio 0.025 \
--logging_steps 1 \
--lr_scheduler_type "cosine" \
--gradient_checkpointing True \
--report_to "wandb" \
--bf16 True \
--deepspeed "ds.json"
# --master_port 29600
All the arguments here are self-explanatory, and you can play with them to fit your purpose.
The full script could be found in here git_repo