Spaces:
Running
Running
# Helper funcs for LLM_XXXXX.py | |
import tiktoken, json, os, yaml | |
from langchain_core.output_parsers.format_instructions import JSON_FORMAT_INSTRUCTIONS | |
from transformers import AutoTokenizer | |
import GPUtil | |
import time | |
import psutil | |
import threading | |
import torch | |
from datetime import datetime | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
try: | |
from vouchervision.tool_taxonomy_WFO import validate_taxonomy_WFO, WFONameMatcher | |
from vouchervision.tool_geolocate_HERE import validate_coordinates_here | |
from vouchervision.tool_wikipedia import validate_wikipedia | |
except: | |
from tool_taxonomy_WFO import validate_taxonomy_WFO, WFONameMatcher | |
from tool_geolocate_HERE import validate_coordinates_here | |
from tool_wikipedia import validate_wikipedia | |
def run_tools(output, tool_WFO, tool_GEO, tool_wikipedia, json_file_path_wiki): | |
# Define a function that will catch and return the results of your functions | |
def task(func, *args, **kwargs): | |
return func(*args, **kwargs) | |
# List of tasks to run in separate threads | |
tasks = [ | |
(validate_taxonomy_WFO, (tool_WFO, output, False)), | |
(validate_coordinates_here, (tool_GEO, output, False)), | |
(validate_wikipedia, (tool_wikipedia, json_file_path_wiki, output)), | |
] | |
# Results storage | |
results = {} | |
# Use ThreadPoolExecutor to execute each function in its own thread | |
with ThreadPoolExecutor() as executor: | |
future_to_func = {executor.submit(task, func, *args): func.__name__ for func, args in tasks} | |
for future in as_completed(future_to_func): | |
func_name = future_to_func[future] | |
try: | |
# Collecting results | |
results[func_name] = future.result() | |
except Exception as exc: | |
print(f'{func_name} generated an exception: {exc}') | |
# Here, all threads have completed | |
# Extracting results | |
Matcher = WFONameMatcher(tool_WFO) | |
GEO_dict_null = { | |
'GEO_override_OCR': False, | |
'GEO_method': '', | |
'GEO_formatted_full_string': '', | |
'GEO_decimal_lat': '', | |
'GEO_decimal_long': '', | |
'GEO_city': '', | |
'GEO_county': '', | |
'GEO_state': '', | |
'GEO_state_code': '', | |
'GEO_country': '', | |
'GEO_country_code': '', | |
'GEO_continent': '', | |
} | |
output_WFO, WFO_record = results.get('validate_taxonomy_WFO', (output, Matcher.NULL_DICT)) | |
output_GEO, GEO_record = results.get('validate_coordinates_here', (output, GEO_dict_null)) | |
return output_WFO, WFO_record, output_GEO, GEO_record | |
def save_individual_prompt(prompt_template, txt_file_path_ind_prompt): | |
with open(txt_file_path_ind_prompt, 'w',encoding='utf-8') as file: | |
file.write(prompt_template) | |
def sanitize_prompt(data): | |
if isinstance(data, dict): | |
return {sanitize_prompt(key): sanitize_prompt(value) for key, value in data.items()} | |
elif isinstance(data, list): | |
return [sanitize_prompt(element) for element in data] | |
elif isinstance(data, str): | |
return data.encode('utf-8', 'ignore').decode('utf-8') | |
else: | |
return data | |
def count_tokens(string, vendor, model_name): | |
full_string = string + JSON_FORMAT_INSTRUCTIONS | |
def run_count(full_string, model_name): | |
# Ensure the encoding is obtained correctly. | |
encoding = tiktoken.encoding_for_model(model_name) | |
tokens = encoding.encode(full_string) | |
return len(tokens) | |
try: | |
if vendor == 'mistral': | |
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1") | |
tokens = tokenizer.tokenize(full_string) | |
return len(tokens) | |
else: | |
return run_count(full_string, model_name) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return 0 | |
class SystemLoadMonitor(): | |
def __init__(self, logger) -> None: | |
self.monitoring_thread = None | |
self.logger = logger | |
self.gpu_usage = {'max_cpu_usage': 0, 'max_load': 0, 'max_vram_usage': 0, "max_ram_usage": 0, 'n_gpus': 0, 'monitoring': True} | |
self.start_time = None | |
self.tool_start_time = None | |
self.has_GPU = torch.cuda.is_available() | |
self.monitor_interval = 2 | |
def start_monitoring_usage(self): | |
self.start_time = time.time() | |
self.monitoring_thread = threading.Thread(target=self.monitor_usage, args=(self.monitor_interval,)) | |
self.monitoring_thread.start() | |
def stop_inference_timer(self): | |
# Stop inference timer and record elapsed time | |
self.inference_time = time.time() - self.start_time | |
# Immediately start the tool timer | |
self.tool_start_time = time.time() | |
def monitor_usage(self, interval): | |
while self.gpu_usage['monitoring']: | |
# GPU monitoring | |
if self.has_GPU: | |
GPUs = GPUtil.getGPUs() | |
self.gpu_usage['n_gpus'] = len(GPUs) # Count the number of GPUs | |
total_load = 0 | |
total_memory_usage_gb = 0 | |
for gpu in GPUs: | |
total_load += gpu.load | |
total_memory_usage_gb += gpu.memoryUsed / 1024.0 | |
if self.gpu_usage['n_gpus'] > 0: # Avoid division by zero | |
# Calculate the average load and memory usage across all GPUs | |
self.gpu_usage['max_load'] = max(self.gpu_usage['max_load'], total_load / self.gpu_usage['n_gpus']) | |
self.gpu_usage['max_vram_usage'] = max(self.gpu_usage['max_vram_usage'], total_memory_usage_gb) | |
# RAM monitoring | |
ram_usage = psutil.virtual_memory().used / (1024.0 ** 3) # Get RAM usage in GB | |
self.gpu_usage['max_ram_usage'] = max(self.gpu_usage.get('max_ram_usage', 0), ram_usage) | |
# CPU monitoring | |
cpu_usage = psutil.cpu_percent(interval=None) | |
self.gpu_usage['max_cpu_usage'] = max(self.gpu_usage.get('max_cpu_usage', 0), cpu_usage) | |
time.sleep(interval) | |
def get_current_datetime(self): | |
# Get the current date and time | |
now = datetime.now() | |
# Format it as a string, replacing colons with underscores | |
datetime_iso = now.strftime('%Y_%m_%dT%H_%M_%S') | |
return datetime_iso | |
def stop_monitoring_report_usage(self): | |
self.gpu_usage['monitoring'] = False | |
self.monitoring_thread.join() | |
tool_time = time.time() - self.tool_start_time if self.tool_start_time else 0 | |
num_gpus, gpu_dict, total_vram_gb, capability_score = check_system_gpus() | |
report = { | |
'inference_time_s': str(round(self.inference_time, 2)), | |
'tool_time_s': str(round(tool_time, 2)), | |
'max_cpu': str(round(self.gpu_usage['max_cpu_usage'], 2)), | |
'max_ram_gb': str(round(self.gpu_usage['max_ram_usage'], 2)), | |
'current_time': self.get_current_datetime(), | |
'n_gpus': self.gpu_usage['n_gpus'], | |
'total_gpu_vram_gb':total_vram_gb, | |
'capability_score':capability_score, | |
} | |
if self.logger: | |
self.logger.info(f"Inference Time: {round(self.inference_time,2)} seconds") | |
self.logger.info(f"Tool Time: {round(tool_time,2)} seconds") | |
self.logger.info(f"Max CPU Usage: {round(self.gpu_usage['max_cpu_usage'],2)}%") | |
self.logger.info(f"Max RAM Usage: {round(self.gpu_usage['max_ram_usage'],2)}GB") | |
else: | |
print(f"Inference Time: {round(self.inference_time,2)} seconds") | |
print(f"Tool Time: {round(tool_time,2)} seconds") | |
print(f"Max CPU Usage: {round(self.gpu_usage['max_cpu_usage'],2)}%") | |
print(f"Max RAM Usage: {round(self.gpu_usage['max_ram_usage'],2)}GB") | |
if self.has_GPU: | |
report.update({'max_gpu_load': str(round(self.gpu_usage['max_load'] * 100, 2))}) | |
report.update({'max_gpu_vram_gb': str(round(self.gpu_usage['max_vram_usage'], 2))}) | |
if self.logger: | |
self.logger.info(f"Max GPU Load: {round(self.gpu_usage['max_load'] * 100, 2)}%") | |
self.logger.info(f"Max GPU Memory Usage: {round(self.gpu_usage['max_vram_usage'], 2)}GB") | |
else: | |
print(f"Max GPU Load: {round(self.gpu_usage['max_load'] * 100, 2)}%") | |
print(f"Max GPU Memory Usage: {round(self.gpu_usage['max_vram_usage'], 2)}GB") | |
else: | |
report.update({'max_gpu_load': '0'}) | |
report.update({'max_gpu_vram_gb': '0'}) | |
return report | |
def check_system_gpus(): | |
print(f"Torch CUDA: {torch.cuda.is_available()}") | |
# if not torch.cuda.is_available(): | |
# return 0, {}, 0, "no_gpu" | |
GPUs = GPUtil.getGPUs() | |
num_gpus = len(GPUs) | |
gpu_dict = {} | |
total_vram = 0 | |
for i, gpu in enumerate(GPUs): | |
gpu_vram = gpu.memoryTotal # VRAM in MB | |
gpu_dict[f"GPU_{i}"] = f"{gpu_vram / 1024} GB" # Convert to GB | |
total_vram += gpu_vram | |
total_vram_gb = total_vram / 1024 # Convert total VRAM to GB | |
capability_score_map = { | |
"no_gpu": 0, | |
"class_8GB": 10, | |
"class_12GB": 14, | |
"class_16GB": 18, | |
"class_24GB": 26, | |
"class_48GB": 50, | |
"class_96GB": 100, | |
"class_96GBplus": float('inf'), # Use infinity to represent any value greater than 96GB | |
} | |
# Determine the capability score based on the total VRAM | |
capability_score = "no_gpu" | |
for score, vram in capability_score_map.items(): | |
if total_vram_gb <= vram: | |
capability_score = score | |
break | |
else: | |
capability_score = "class_max" | |
return num_gpus, gpu_dict, total_vram_gb, capability_score | |
if __name__ == '__main__': | |
num_gpus, gpu_dict, total_vram_gb, capability_score = check_system_gpus() | |
print(f"Number of GPUs: {num_gpus}") | |
print(f"GPU Details: {gpu_dict}") | |
print(f"Total VRAM: {total_vram_gb} GB") | |
print(f"Capability Score: {capability_score}") | |