import io import os import time import json import base64 import requests import subprocess import platform from PIL import Image from requests.adapters import HTTPAdapter import re from urllib3.util.retry import Retry from huggingface_hub import snapshot_download API_PATH = 'api_settings.json' QWEN_MOD = 'qwen-vl-plus' DEFAULT_GPT_MODEL = 'gpt-4o' DEFAULT_CLAUDE_MODEL = 'claude-3-sonnet' # 扩展prompt {} 标记功能,从文件读取额外内容 def addition_prompt_process(prompt, image_path): # 从image_path分离文件名和扩展名,并更改扩展名为.txt if '{' not in prompt and '}' not in prompt: return prompt file_root, _ = os.path.splitext(image_path) new_file_name = os.path.basename(file_root) + ".txt" # 从prompt中提取目录路径 directory_path = prompt[prompt.find('{') + 1: prompt.find('}')] # 拼接新的文件路径 full_path = os.path.join(directory_path, new_file_name) # 读取full_path指定的文件内容 try: with open(full_path, 'r') as file: file_content = file.read() except Exception as e: return f"Error reading file: {e}" new_prompt = prompt.replace('{' + directory_path + '}', file_content) return new_prompt # 通义千问VL def is_ali(api_url): if api_url.endswith("/v1/services/aigc/multimodal-generation/generation"): return True else: return False def is_claude(api_url, model): if api_url.endswith("v1/messages") or "claude" in model.lower(): return True else: return False def qwen_api_switch(mod): global QWEN_MOD QWEN_MOD = mod return QWEN_MOD def qwen_api(image_path, prompt, api_key): print(f"QWEN_MOD: {QWEN_MOD}") os.environ['DASHSCOPE_API_KEY'] = api_key from dashscope import MultiModalConversation img = f"file://{image_path}" messages = [{ 'role': 'system', 'content': [ {'text': 'You are a helpful assistant.'} ] }, { 'role':'user', 'content': [ {'image': img}, {'text': prompt}, ] }] response = MultiModalConversation.call(model=QWEN_MOD, messages=messages, stream=False, max_length=300) if '"status_code": 400' in response: return f"API error: {response}" if response.get("output") and response["output"].get("choices") and response["output"]["choices"][0].get("message") and response["output"]["choices"][0]["message"].get("content"): if response["output"]["choices"][0]["message"]["content"][0].get("text", False): caption = response["output"]["choices"][0]["message"]["content"][0]["text"] else: box_value = response["output"]["choices"][0]["message"]["content"][0]["box"] text_value = response["output"]["choices"][0]["message"]["content"][1]["text"] b_value = re.search(r'(.*?)', box_value).group(1) caption = b_value + text_value else: caption = response return caption def claude_api(image_path, prompt, api_key, api_url, model, quality=None): print(f"CLAUDE_MODEL: {model}") with open(image_path, "rb") as image_file: # Downscale the image image = Image.open(image_file) width, height = image.size if quality: if quality == "high": target = 1024 elif quality == "low": target = 512 elif quality == "auto": if width >= 1024 or height >= 1024: target = 1024 else: target = 512 else: target = 1024 aspect_ratio = width / height # Determine the new dimensions while maintaining the aspect ratio if width > target or height > target: if width > height: new_width = target new_height = int(new_width / aspect_ratio) else: new_height = target new_width = int(new_height * aspect_ratio) else: new_width, new_height = width, height # Resize the image resized_image = image.resize((new_width, new_height), Image.LANCZOS) # Use buffer to store image buffer = io.BytesIO() resized_image.save(buffer, format="JPEG") image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') # Claude API data = { "model": model, "max_tokens": 300, "messages": [ {"role": "user", "content": [ {"type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": image_base64 } }, {"type": "text", "text": prompt} ] } ] } # print(f"data: {data}\n") headers = { "Content-Type": "application/json", "x-api-key:": api_key, "anthropic-version": "2023-06-01" } # 配置重试策略 retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]) # 更新参数名 with requests.Session() as s: s.mount('https://', HTTPAdapter(max_retries=retries)) try: response = s.post(api_url, headers=headers, json=data) response.raise_for_status() # 连接错误回显 except requests.exceptions.HTTPError as errh: return f"HTTP Error: {errh}" except requests.exceptions.ConnectionError as errc: return f"Error Connecting: {errc}" except requests.exceptions.Timeout as errt: return f"Timeout Error: {errt}" except requests.exceptions.RequestException as err: return f"OOps: Something Else: {err}" try: response_data = response.json() if 'error' in response_data: return f"API error: {response_data['error']['message']}" caption = response_data['content'][0]['text'] return caption except Exception as e: return f"Failed to parse the API response: {e}\n{response.text}" # API使用 def run_openai_api(image_path, prompt, api_key, api_url, quality=None, timeout=10, model=DEFAULT_GPT_MODEL): prompt = addition_prompt_process(prompt, image_path) # print("prompt{}:",prompt) # Qwen-VL if is_ali(api_url): return qwen_api(image_path, prompt, api_key) if is_claude(api_url, model): return claude_api(image_path, prompt, api_key, api_url, model, quality) with open(image_path, "rb") as image_file: image_base64 = base64.b64encode(image_file.read()).decode('utf-8') # GPT-4V data = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}", "detail": f"{quality}"} }, {"type": "text", "text": prompt} ] } ], "max_tokens": 300 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } # 配置重试策略 retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]) # 更新参数名 with requests.Session() as s: s.mount('https://', HTTPAdapter(max_retries=retries)) try: response = s.post(api_url, headers=headers, json=data, timeout=timeout) response.raise_for_status() # 连接错误回显 except requests.exceptions.HTTPError as errh: return f"HTTP Error: {errh}" except requests.exceptions.ConnectionError as errc: return f"Error Connecting: {errc}" except requests.exceptions.Timeout as errt: return f"Timeout Error: {errt}" except requests.exceptions.RequestException as err: return f"OOps: Something Else: {err}" try: response_data = response.json() if 'error' in response_data: return f"API error: {response_data['error']['message']}" caption = response_data["choices"][0]["message"]["content"] return caption except Exception as e: return f"Failed to parse the API response: {e}\n{response.text}" # API存档 def save_api_details(api_key, api_url): if is_ali(api_url): settings = { 'model' : QWEN_MOD, 'api_key': api_key, 'api_url': api_url } else: settings = { 'model' : 'GPT', 'api_key': api_key, 'api_url': api_url } # 不记录空的apikey if api_key != "": with open(API_PATH, 'w', encoding='utf-8') as f: json.dump(settings, f) def save_state(llm, key, url): if llm[:3] == "GPT" or llm[:4] == "qwen": settings = { 'model': llm, 'api_key': key, 'api_url': url } elif llm[:3] == "Cog" or llm[:4] == "moon" or llm[:7] == "MiniCPM": settings = { 'model' : llm, 'api_key': "", 'api_url': "http://127.0.0.1:8000/v1/chat/completions" } output = f"Set {llm} as default. / {llm}已设为默认" with open(API_PATH, 'w', encoding='utf-8') as f: json.dump(settings, f) return output # 读取API设置 def get_api_details(): settings_file = API_PATH if os.path.exists(settings_file): with open(settings_file, 'r') as f: settings = json.load(f) if settings.get('model', '') != '': mod = settings.get('model', '') url = settings.get('api_url', '') if mod[:4] == "qwen": global QWEN_MOD QWEN_MOD = mod else: if is_ali(url): mod = QWEN_MOD return mod, settings.get('api_key', ''), url else: if settings.get('api_key', '') != '': i_key = settings.get('api_key', '') i_url = settings.get('api_url', '') save_api_details(i_key,i_url) with open(settings_file, 'r') as i: settings = json.load(i) return settings.get('model', ''), settings.get('api_key', ''), settings.get('api_url', '') return 'GPT', '', '' # 本地模型相关 def downloader(model_type, acceleration): endpoint = 'https://hf-mirror.com' if acceleration == 'CN' else None if model_type == 'vqa' or model_type == 'chat': snapshot_download( repo_id="lmsys/vicuna-7b-v1.5", allow_patterns=["tokenizer*","special_tokens_map.json"], endpoint=endpoint ) if model_type == 'vqa': snapshot_download( repo_id="THUDM/cogagent-vqa-hf", local_dir="./models/cogagent-vqa-hf", max_workers=8, endpoint=endpoint ) elif model_type == 'chat': snapshot_download( repo_id="THUDM/cogagent-chat-hf", local_dir="./models/cogagent-chat-hf", max_workers=8, endpoint=endpoint ) elif model_type == 'moondream': snapshot_download( repo_id="vikhyatk/moondream1", local_dir="./models/moondream", max_workers=8, endpoint=endpoint ) elif model_type == 'minicpm': snapshot_download( repo_id="openbmb/MiniCPM-Llama3-V-2_5", local_dir="./models/MiniCPM-Llama3-V-2_5", max_workers=8, endpoint=endpoint ) return f"{model_type} Model download completed. / {model_type}模型下载完成" def installer(): if platform.system() == "Windows": install_command = f'.\install_script\installcog.bat' else: install_command = f'./install_script/installcog.sh' subprocess.Popen(f'chmod +x {install_command}', shell=True) subprocess.Popen('', shell=True) #Use an empty subprocess to refresh permission. If deleted, installcog.sh wouldn't launch properly, with Permission denied error subprocess.Popen(install_command, shell=True) while not os.path.exists('install_temp.txt'): time.sleep(2) with open('install_temp.txt', 'r') as file: result_string = file.read() os.remove('install_temp.txt') return result_string