Spaces:
Running
Running
Major update. Support for 15 LLMs, World Flora Online taxonomy validation, geolocation, 2 OCR methods, significant UI changes, stability improvements, consistent JSON parsing
3a1d033
import os, io, openai, vertexai, json, tempfile | |
from mistralai.client import MistralClient | |
from mistralai.models.chat_completion import ChatMessage | |
from langchain.schema import HumanMessage | |
from langchain_openai import AzureChatOpenAI | |
from vertexai.language_models import TextGenerationModel | |
from vertexai.preview.generative_models import GenerativeModel | |
from google.cloud import vision | |
from datetime import datetime | |
import google.generativeai as genai | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
class APIvalidation: | |
def __init__(self, cfg_private, dir_home, is_hf) -> None: | |
self.cfg_private = cfg_private | |
self.dir_home = dir_home | |
self.is_hf = is_hf | |
self.formatted_date = self.get_formatted_date() | |
def get_formatted_date(self): | |
# Get the current date | |
current_date = datetime.now() | |
# Format the date as "Month day, year" (e.g., "January 23, 2024") | |
formatted_date = current_date.strftime("%B %d, %Y") | |
return formatted_date | |
def has_API_key(self, val): | |
if val: | |
return True | |
else: | |
return False | |
def check_openai_api_key(self): | |
if self.cfg_private: | |
openai.api_key = self.cfg_private['openai']['OPENAI_API_KEY'] | |
else: | |
openai.api_key = os.getenv('OPENAI_API_KEY') | |
try: | |
openai.models.list() | |
return True | |
except: | |
return False | |
# def check_google_ocr_api_key(self): ##################################################################################### maybe check without initi, post the vertexai | |
# # if os.path.exists(self.cfg_private['google_cloud']['path_json_file']): | |
# # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cfg_private['google_cloud']['path_json_file'] | |
# # elif os.path.exists(self.cfg_private['google_cloud']['path_json_file_service_account2']): | |
# # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cfg_private['google_cloud']['path_json_file_service_account2'] | |
# # else: | |
# # return False | |
# try: | |
# if not self.cfg_private: | |
# # Convert JSON key from string to a dictionary | |
# service_account_json_str = os.getenv('google_service_account_json') | |
# if not service_account_json_str: | |
# print("Service account JSON not found in environment variables.") | |
# return False | |
# # Convert JSON string to a dictionary | |
# service_account_info = json.loads(service_account_json_str) | |
# # Create credentials from the service account info | |
# credentials = service_account.Credentials.from_service_account_info(service_account_info) | |
# # Initialize the client with the credentials | |
# client = vision.ImageAnnotatorClient(credentials=credentials) | |
# logo_path = os.path.join(self.dir_home, 'img','logo.png') | |
# with io.open(logo_path, 'rb') as image_file: | |
# content = image_file.read() | |
# image = vision.Image(content=content) | |
# response = client.document_text_detection(image=image) | |
# texts = response.text_annotations | |
# normal_cleaned_text = texts[0].description if texts else None | |
# print(f"OCR TEST: {normal_cleaned_text}") | |
# else: | |
# logo_path = os.path.join(self.dir_home, 'img','logo.png') | |
# client = vision.ImageAnnotatorClient() | |
# with io.open(logo_path, 'rb') as image_file: | |
# content = image_file.read() | |
# image = vision.Image(content=content) | |
# response = client.document_text_detection(image=image) | |
# texts = response.text_annotations | |
# normal_cleaned_text = texts[0].description if texts else None | |
# if normal_cleaned_text: | |
# return True | |
# else: | |
# return False | |
# except: | |
# return False | |
def check_azure_openai_api_key(self): | |
if not self.is_hf: | |
try: | |
# Initialize the Azure OpenAI client | |
model = AzureChatOpenAI( | |
deployment_name = 'gpt-35-turbo',#'gpt-35-turbo', | |
openai_api_version = self.cfg_private['openai_azure']['api_version'], | |
openai_api_key = self.cfg_private['openai_azure']['openai_api_key'], | |
azure_endpoint = self.cfg_private['openai_azure']['openai_api_base'], | |
openai_organization = self.cfg_private['openai_azure']['openai_organization'], | |
) | |
msg = HumanMessage(content="hello") | |
# self.llm_object.temperature = self.config.get('temperature') | |
response = model([msg]) | |
# Check the response content (you might need to adjust this depending on how your AzureChatOpenAI class handles responses) | |
if response: | |
return True | |
else: | |
return False | |
except Exception as e: # Use a more specific exception if possible | |
return False | |
else: | |
try: | |
azure_api_version = os.getenv('AZURE_API_VERSION') | |
azure_api_key = os.getenv('AZURE_API_KEY') | |
azure_api_base = os.getenv('AZURE_API_BASE') | |
azure_organization = os.getenv('AZURE_ORGANIZATION') | |
# Initialize the Azure OpenAI client | |
model = AzureChatOpenAI( | |
deployment_name = 'gpt-35-turbo',#'gpt-35-turbo', | |
openai_api_version = azure_api_version, | |
openai_api_key = azure_api_key, | |
azure_endpoint = azure_api_base, | |
openai_organization = azure_organization, | |
) | |
msg = HumanMessage(content="hello") | |
# self.llm_object.temperature = self.config.get('temperature') | |
response = model([msg]) | |
# Check the response content (you might need to adjust this depending on how your AzureChatOpenAI class handles responses) | |
if response: | |
return True | |
else: | |
return False | |
except Exception as e: # Use a more specific exception if possible | |
return False | |
def check_mistral_api_key(self): | |
try: | |
if not self.is_hf: | |
client = MistralClient(api_key=self.cfg_private['mistral']['mistral_key']) | |
else: | |
client = MistralClient(api_key=os.getenv('MISTRAL_API_KEY')) | |
# Initialize the Mistral Client with the API key | |
# Create a simple message | |
messages = [ChatMessage(role="user", content="hello")] | |
# Send the message and get the response | |
chat_response = client.chat( | |
model="mistral-tiny", | |
messages=messages, | |
) | |
# Check if the response is valid (adjust this according to the actual response structure) | |
if chat_response and chat_response.choices: | |
return True | |
else: | |
return False | |
except Exception as e: # Replace with a more specific exception if possible | |
return False | |
# def get_google_credentials(self): | |
# # Convert JSON key from string to a dictionary | |
# service_account_json_str = os.getenv('google_service_account_json') | |
# with tempfile.NamedTemporaryFile(mode="w+", delete=False,suffix=".json") as temp: | |
# temp.write(service_account_json_str) | |
# temp_filename = temp.name | |
# return temp_filename | |
# https://cloud.google.com/docs/authentication/provide-credentials-adc | |
def get_google_credentials(self): | |
creds_json_str = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') | |
credentials = service_account.Credentials.from_service_account_info(json.loads(creds_json_str)) | |
return credentials | |
# def init_google_client(opt, opt2): | |
# # Fetch the credentials JSON string from Hugging Face Secrets | |
# creds_json_str = os.getenv('google_service_account_json') | |
# if creds_json_str: | |
# creds_dict = json.loads(creds_json_str) | |
# credentials = service_account.Credentials.from_service_account_info(creds_dict) | |
# # Initialize Google API client (if needed for your use case) | |
# client = build(opt, opt2, credentials=credentials) # Adjust with actual service details | |
# return client, credentials | |
# else: | |
# print("Google API credentials not found.") | |
def check_google_vertex_genai_api_key(self): | |
results = {"palm2": False, "gemini": False} | |
if not self.is_hf: | |
try: # Local | |
# Assuming genai and vertexai are clients for Google services | |
# os.environ["GOOGLE_API_KEY"] = self.cfg_private['google_palm']['google_palm_api'] | |
# genai.configure(api_key=self.cfg_private['google_palm']['google_palm_api']) | |
vertexai.init(project= self.cfg_private['google_palm']['project_id'], location=self.cfg_private['google_palm']['location']) | |
try: | |
model = TextGenerationModel.from_pretrained("text-bison@001") | |
response = model.predict("Hello") | |
test_response_palm = response.text | |
# llm_palm = ChatGoogleGenerativeAI(model="text-bison@001") | |
# test_response_palm = llm_palm.invoke("Hello") | |
if test_response_palm: | |
results["palm2"] = True | |
except Exception as e: | |
pass | |
try: | |
model = GenerativeModel("gemini-pro") | |
response = model.generate_content("Hello") | |
test_response_gemini = response.text | |
# llm_gemini = ChatGoogleGenerativeAI(model="gemini-pro") | |
# test_response_gemini = llm_gemini.invoke("Hello") | |
if test_response_gemini: | |
results["gemini"] = True | |
except Exception as e: | |
pass | |
return results | |
except Exception as e: # Replace with a more specific exception if possible | |
return results | |
else: | |
### is hugging face | |
try: | |
# Assuming genai and vertexai are clients for Google services | |
# os.environ["GOOGLE_API_KEY"] = os.getenv('PALM_API_KEY') | |
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.get_google_credentials() | |
# client, credentials = self.init_google_client('gemini-pro', 'v1') | |
# print(credentials) | |
print("service account") | |
palm_api_key = os.getenv('PALM_API_KEY') | |
google_project_id = os.getenv('GOOGLE_PROJECT_ID') | |
google_location = os.getenv('GOOGLE_LOCATION') | |
os.environ['GOOGLE_API_KEY'] = os.getenv('PALM_API_KEY') | |
vertexai.init(project=os.getenv('GOOGLE_PROJECT_ID'), location=os.getenv('GOOGLE_LOCATION'),credentials=self.get_google_credentials()) | |
# genai.configure(api_key=palm_api_key) | |
# vertexai.init(project=google_project_id, location=google_location)#, credentials=credentials) | |
print("service account pass") | |
try: | |
model = TextGenerationModel.from_pretrained("text-bison@001") | |
response = model.predict("Hello") | |
test_response_palm = response.text | |
# llm_palm = ChatGoogleGenerativeAI(model="text-bison@001") | |
# test_response_palm = llm_palm.invoke("Hello") | |
if test_response_palm: | |
results["palm2"] = True | |
print(f"palm2 pass [{test_response_palm}]") | |
else: | |
print(f"palm2 yes [{test_response_palm}]") | |
except Exception as e: | |
print(f"palm2 [{e}]") | |
pass | |
try: | |
model = GenerativeModel("gemini-pro") | |
response = model.generate_content("Hello") | |
test_response_gemini = response.text | |
# llm_gemini = ChatGoogleGenerativeAI(model="gemini-pro") | |
# test_response_gemini = llm_gemini.invoke("Hello") | |
if test_response_gemini: | |
results["gemini"] = True | |
print(f"gemini pass [{test_response_palm}]") | |
else: | |
print(f"gemini yes [{test_response_palm}]") | |
except Exception as e: | |
print(f"gemini [{e}]") | |
pass | |
return results | |
except Exception as e: # Replace with a more specific exception if possible | |
print(f"Immediate [{e}]") | |
return results | |
def report_api_key_status(self): | |
missing_keys = [] | |
present_keys = [] | |
if not self.is_hf: | |
k_OPENAI_API_KEY = self.cfg_private['openai']['OPENAI_API_KEY'] | |
k_openai_azure = self.cfg_private['openai_azure']['api_version'] | |
k_google_palm_api = self.cfg_private['google_palm']['google_palm_api'] | |
k_project_id = self.cfg_private['google_palm']['project_id'] | |
k_location = self.cfg_private['google_palm']['location'] | |
k_mistral = self.cfg_private['mistral']['mistral_key'] | |
k_here = self.cfg_private['here']['api_key'] | |
k_opencage = self.cfg_private['open_cage_geocode']['api_key'] | |
else: | |
k_OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
k_openai_azure = os.getenv('AZURE_API_VERSION') | |
k_google_palm_api = os.getenv('PALM_API_KEY') | |
k_project_id = os.getenv('GOOGLE_PROJECT_ID') | |
k_location = os.getenv('GOOGLE_LOCATION') | |
k_mistral = os.getenv('MISTRAL_API_KEY') | |
k_here = os.getenv('here_api_key') | |
k_opencage = os.getenv('open_cage_geocode') | |
# Check each key and add to the respective list | |
# Google OCR key check | |
if self.has_API_key(k_google_palm_api) and self.has_API_key(k_project_id) and self.has_API_key(k_location): | |
is_valid = True #self.check_google_ocr_api_key() ############################################################################################################################### | |
if is_valid: | |
present_keys.append('Google OCR (Valid)') | |
else: | |
present_keys.append('Google OCR (Invalid)') | |
else: | |
missing_keys.append('Google OCR') | |
# OpenAI key check | |
if self.has_API_key(k_OPENAI_API_KEY): | |
is_valid = self.check_openai_api_key() | |
if is_valid: | |
present_keys.append('OpenAI (Valid)') | |
else: | |
present_keys.append('OpenAI (Invalid)') | |
else: | |
missing_keys.append('OpenAI') | |
# Azure OpenAI key check | |
if self.has_API_key(k_openai_azure): | |
is_valid = self.check_azure_openai_api_key() | |
if is_valid: | |
present_keys.append('Azure OpenAI (Valid)') | |
else: | |
present_keys.append('Azure OpenAI (Invalid)') | |
else: | |
missing_keys.append('Azure OpenAI') | |
# Google PALM2/Gemini key check | |
if self.has_API_key(k_google_palm_api) and self.has_API_key(k_project_id) and self.has_API_key(k_location): | |
google_results = self.check_google_vertex_genai_api_key() | |
if google_results['palm2']: | |
present_keys.append('Palm2 (Valid)') | |
else: | |
present_keys.append('Palm2 (Invalid)') | |
if google_results['gemini']: | |
present_keys.append('Gemini (Valid)') | |
else: | |
present_keys.append('Gemini (Invalid)') | |
else: | |
missing_keys.append('Google VertexAI/GenAI') | |
# Mistral key check | |
if self.has_API_key(k_mistral): | |
is_valid = self.check_mistral_api_key() | |
if is_valid: | |
present_keys.append('Mistral (Valid)') | |
else: | |
present_keys.append('Mistral (Invalid)') | |
else: | |
missing_keys.append('Mistral') | |
if self.has_API_key(k_here): | |
present_keys.append('HERE Geocode (Valid)') | |
else: | |
missing_keys.append('HERE Geocode (Invalid)') | |
if self.has_API_key(k_opencage): | |
present_keys.append('OpenCage Geocode (Valid)') | |
else: | |
missing_keys.append('OpenCage Geocode (Invalid)') | |
# Create a report string | |
report = "API Key Status Report:\n" | |
report += "Present Keys: " + ", ".join(present_keys) + "\n" | |
report += "Missing Keys: " + ", ".join(missing_keys) + "\n" | |
print(report) | |
return present_keys, missing_keys, self.formatted_date |