VoucherVision / vouchervision /LLM_local_custom_fine_tune.py
phyloforfun's picture
July 18 update
c5e57d6
import os, re, json, yaml, torch
from peft import AutoPeftModelForCausalLM
from transformers import AutoTokenizer
import json, torch, transformers, gc
from transformers import BitsAndBytesConfig
from langchain.output_parsers.retry import RetryOutputParser
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from huggingface_hub import hf_hub_download
from langchain_community.llms.huggingface_pipeline import HuggingFacePipeline
from vouchervision.utils_LLM import SystemLoadMonitor, run_tools, count_tokens, save_individual_prompt, sanitize_prompt
from vouchervision.utils_LLM_JSON_validation import validate_and_align_JSON_keys_with_template
# MODEL_NAME = "unsloth/mistral-7b-instruct-v0.2-bnb-4bit"
# sltp_version = 'HLT_MICH_Angiospermae_SLTPvA_v1-0_medium__OCR-C25-L25-E50-R05'
# LORA = "phyloforfun/mistral-7b-instruct-v2-bnb-4bit__HLT_MICH_Angiospermae_SLTPvC_v1-0_medium_OCR-C25-L25-E50-R05"
TEXT = "HERBARIUM OF MARCUS W. LYON , JR . Tracaulon sagittatum Indiana : Porter Co. Mincral Springs edge wet subdural woods 1927 TX 11 Flowers pink UNIVERSIT HERBARIUM MICHIGAN MICH University of Michigan Herbarium 1439649 copyright reserved PERSICARIA FEB 26 1965 cm "
PARENT_MODEL = "unsloth/mistral-7b-instruct-v0.2-bnb-4bit"
class LocalFineTuneHandler:
RETRY_DELAY = 2 # Wait 2 seconds before retrying
MAX_RETRIES = 5 # Maximum number of retries
STARTING_TEMP = 0.001
TOKENIZER_NAME = None
VENDOR = 'mistral'
MAX_GPU_MONITORING_INTERVAL = 2 # seconds
def __init__(self, cfg, logger, model_name, JSON_dict_structure, config_vals_for_permutation=None):
# self.model_id = f"phyloforfun/{self.model_name}"
# model_name = LORA #######################################################
# self.JSON_dict_structure = JSON_dict_structure
# self.JSON_dict_structure_str = json.dumps(self.JSON_dict_structure, sort_keys=False, indent=4)
self.JSON_dict_structure_str = """{"catalogNumber": "", "scientificName": "", "genus": "", "specificEpithet": "", "scientificNameAuthorship": "", "collector": "", "recordNumber": "", "identifiedBy": "", "verbatimCollectionDate": "", "collectionDate": "", "occurrenceRemarks": "", "habitat": "", "locality": "", "country": "", "stateProvince": "", "county": "", "municipality": "", "verbatimCoordinates": "", "decimalLatitude": "", "decimalLongitude": "", "minimumElevationInMeters": "", "maximumElevationInMeters": ""}"""
self.cfg = cfg
self.print_output = True
self.tool_WFO = self.cfg['leafmachine']['project']['tool_WFO']
self.tool_GEO = self.cfg['leafmachine']['project']['tool_GEO']
self.tool_wikipedia = self.cfg['leafmachine']['project']['tool_wikipedia']
self.logger = logger
self.has_GPU = torch.cuda.is_available()
if self.has_GPU:
self.device = "cuda"
else:
self.device = "cpu"
self.monitor = SystemLoadMonitor(logger)
self.model_name = model_name.split("/")[1]
self.model_id = model_name
# self.model_path = hf_hub_download(repo_id=self.model_id, repo_type="model",filename="config.json")
self.starting_temp = float(self.STARTING_TEMP)
self.temp_increment = float(0.2)
self.adjust_temp = self.starting_temp
self.load_in_4bit = False
self.parser = JsonOutputParser()
self._load_model()
self._create_prompt()
self._set_config()
self._build_model_chain_parser()
def _set_config(self):
# self._clear_VRAM()
self.config = {'max_new_tokens': 1024,
'temperature': self.starting_temp,
'seed': 2023,
'top_p': 1,
# 'top_k': 1,
# 'top_k': 40,
'do_sample': False,
'n_ctx':4096,
# Activate 4-bit precision base model loading
# 'use_4bit': True,
# # Compute dtype for 4-bit base models
# 'bnb_4bit_compute_dtype': "float16",
# # Quantization type (fp4 or nf4)
# 'bnb_4bit_quant_type': "nf4",
# # Activate nested quantization for 4-bit base models (double quantization)
# 'use_nested_quant': False,
}
def _adjust_config(self):
new_temp = self.adjust_temp + self.temp_increment
if self.json_report:
self.json_report.set_text(text_main=f'Incrementing temperature from {self.adjust_temp} to {new_temp}')
self.logger.info(f'Incrementing temperature from {self.adjust_temp} to {new_temp}')
self.adjust_temp += self.temp_increment
def _reset_config(self):
if self.json_report:
self.json_report.set_text(text_main=f'Resetting temperature from {self.adjust_temp} to {self.starting_temp}')
self.logger.info(f'Resetting temperature from {self.adjust_temp} to {self.starting_temp}')
self.adjust_temp = self.starting_temp
def _load_model(self):
self.model = AutoPeftModelForCausalLM.from_pretrained(
pretrained_model_name_or_path=self.model_id, # YOUR MODEL YOU USED FOR TRAINING
load_in_4bit = self.load_in_4bit,
low_cpu_mem_usage=True,
).to(self.device)
self.tokenizer = AutoTokenizer.from_pretrained(PARENT_MODEL)
self.eos_token_id = self.tokenizer.eos_token_id
# def _build_model_chain_parser(self):
# self.local_model_pipeline = transformers.pipeline("text-generation",
# model=self.model_id,
# max_new_tokens=self.config.get('max_new_tokens'),
# # top_k=self.config.get('top_k'),
# top_p=self.config.get('top_p'),
# do_sample=self.config.get('do_sample'),
# model_kwargs={"load_in_4bit": self.load_in_4bit})
# self.local_model = HuggingFacePipeline(pipeline=self.local_model_pipeline)
# # Set up the retry parser with the runnable
# # self.retry_parser = RetryWithErrorOutputParser.from_llm(parser=self.parser, llm=self.local_model, max_retries=self.MAX_RETRIES)
# self.retry_parser = RetryOutputParser(parser=self.parser, llm=self.local_model, max_retries=self.MAX_RETRIES)
# # Create an llm chain with LLM and prompt
# self.chain = self.prompt | self.local_model # LCEL
def _build_model_chain_parser(self):
self.local_model_pipeline = transformers.pipeline(
"text-generation",
model=self.model_id,
max_new_tokens=self.config.get('max_new_tokens'),
top_k=self.config.get('top_k', None),
top_p=self.config.get('top_p'),
do_sample=self.config.get('do_sample'),
model_kwargs={"load_in_4bit": self.load_in_4bit},
)
self.local_model = HuggingFacePipeline(pipeline=self.local_model_pipeline)
self.retry_parser = RetryOutputParser(parser=self.parser, llm=self.local_model, max_retries=self.MAX_RETRIES)
def _create_prompt(self):
self.alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
### Instruction:
{}
### Input:
{}
### Response:
{}"""
self.template = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
### Instruction:
{}
### Input:
{}
### Response:
{}""".format("{instructions}", "{OCR_text}", "{empty}")
self.instructions_text = """Refactor the unstructured text into a valid JSON dictionary. The key names follow the Darwin Core Archive Standard. If a key lacks content, then insert an empty string. Fill in the following JSON structure as required: """
self.instructions_json = self.JSON_dict_structure_str.replace("\n ", " ").strip().replace("\n", " ")
self.instructions = ''.join([self.instructions_text, self.instructions_json])
# Create a prompt from the template so we can use it with Langchain
self.prompt = PromptTemplate(template=self.template, input_variables=["instructions", "OCR_text", "empty"])
# Set up a parser
self.parser = JsonOutputParser()
def extract_json(self, response_text):
# Assuming the response is a list with a single string entry
# response_text = response[0]
response_pattern = re.compile(r'### Response:(.*)', re.DOTALL)
response_match = response_pattern.search(response_text)
if not response_match:
raise ValueError("No '### Response:' section found in the provided text")
response_text = response_match.group(1)
# Use a regular expression to find JSON objects in the response text
json_objects = re.findall(r'\{.*?\}', response_text, re.DOTALL)
if json_objects:
# Assuming you want the first JSON object if there are multiple
json_str = json_objects[0]
# Convert the JSON string to a Python dictionary
json_dict = json.loads(json_str)
return json_str, json_dict
else:
raise ValueError("No JSON object found in the '### Response:' section")
def call_llm_local_custom_fine_tune(self, OCR_text, json_report, paths):
_____, ____, _, __, ___, json_file_path_wiki, txt_file_path_ind_prompt = paths
self.json_report = json_report
if self.json_report:
self.json_report.set_text(text_main=f'Sending request to {self.model_name}')
self.monitor.start_monitoring_usage()
nt_in = 0
nt_out = 0
self.inputs = self.tokenizer(
[
self.alpaca_prompt.format(
self.instructions, # instruction
OCR_text, # input
"", # output - leave this blank for generation!
)
], return_tensors = "pt").to(self.device)
ind = 0
while ind < self.MAX_RETRIES:
ind += 1
try:
# Fancy
# Dynamically set the temperature for this specific request
model_kwargs = {"temperature": self.adjust_temp}
# Invoke the chain to generate prompt text
# results = self.chain.invoke({"instructions": self.instructions, "OCR_text": OCR_text, "empty": "", "model_kwargs": model_kwargs})
# Use retry_parser to parse the response with retry logic
# output = self.retry_parser.parse_with_prompt(results, prompt_value=OCR_text)
results = self.local_model.invoke(OCR_text)
output = self.retry_parser.parse_with_prompt(results, prompt_value=OCR_text)
# Should work:
# output = self.model.generate(**self.inputs, eos_token_id=self.eos_token_id, max_new_tokens=512) # Adjust max_length as needed
# Decode the generated text
# generated_text = self.tokenizer.decode(output[0], skip_special_tokens=True)
# json_str, json_dict = self.extract_json(generated_text)
if self.print_output:
# print("\nJSON String:")
# print(json_str)
print("\nJSON Dictionary:")
print(output)
if output is None:
self.logger.error(f'Failed to extract JSON from:\n{results}')
self._adjust_config()
del results
else:
nt_in = count_tokens(self.instructions+OCR_text, self.VENDOR, self.TOKENIZER_NAME)
nt_out = count_tokens(results, self.VENDOR, self.TOKENIZER_NAME)
output = validate_and_align_JSON_keys_with_template(output, json.loads(self.JSON_dict_structure_str))
if output is None:
self.logger.error(f'[Attempt {ind}] Failed to extract JSON from:\n{results}')
self._adjust_config()
else:
self.monitor.stop_inference_timer() # Starts tool timer too
if self.json_report:
self.json_report.set_text(text_main=f'Working on WFO, Geolocation, Links')
output_WFO, WFO_record, output_GEO, GEO_record = run_tools(output, self.tool_WFO, self.tool_GEO, self.tool_wikipedia, json_file_path_wiki)
save_individual_prompt(sanitize_prompt(self.instructions+OCR_text), txt_file_path_ind_prompt)
self.logger.info(f"Formatted JSON:\n{json.dumps(output,indent=4)}")
usage_report = self.monitor.stop_monitoring_report_usage()
if self.adjust_temp != self.starting_temp:
self._reset_config()
if self.json_report:
self.json_report.set_text(text_main=f'LLM call successful')
del results
return output, nt_in, nt_out, WFO_record, GEO_record, usage_report
except Exception as e:
self.logger.error(f'{e}')
self.logger.info(f"Failed to extract valid JSON after [{ind}] attempts")
if self.json_report:
self.json_report.set_text(text_main=f'Failed to extract valid JSON after [{ind}] attempts')
self.monitor.stop_inference_timer() # Starts tool timer too
usage_report = self.monitor.stop_monitoring_report_usage()
if self.json_report:
self.json_report.set_text(text_main=f'LLM call failed')
return None, nt_in, nt_out, None, None, usage_report
# # Create a prompt from the template so we can use it with Langchain
# self.prompt = PromptTemplate(template=template, input_variables=["query"])
# # Set up a parser
# self.parser = JsonOutputParser()
model_name = "unsloth/mistral-7b-instruct-v0.2-bnb-4bit"
sltp_version = 'HLT_MICH_Angiospermae_SLTPvA_v1-0_medium__OCR-C25-L25-E50-R05'
lora_name = "phyloforfun/mistral-7b-instruct-v2-bnb-4bit__HLT_MICH_Angiospermae_SLTPvA_v1-0_medium__OCR-C25-L25-E50-R05"
OCR_test = "HERBARIUM OF MARCUS W. LYON , JR . Tracaulon sagittatum Indiana : Porter Co. Mincral Springs edge wet subdural woods 1927 TX 11 Flowers pink UNIVERSIT HERBARIUM MICHIGAN MICH University of Michigan Herbarium 1439649 copyright reserved PERSICARIA FEB 26 1965 cm "
# model.merge_and_unload()
# Generate the output