|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import os.path as osp |
|
|
|
from huggingface_hub import repo_exists, snapshot_download |
|
from huggingface_hub.utils import HFValidationError, validate_repo_id |
|
from transformers import AutoConfig, AutoTokenizer, PretrainedConfig |
|
|
|
from .configuration_vila import VILAConfig |
|
from .constants import MEDIA_TOKENS |
|
from .tokenizer_utils import infer_stop_tokens |
|
|
|
|
|
def load_tokenizer_then_handle_media_tokens_and_chat_template( |
|
model_name_or_path, config: VILAConfig, model_max_length=None |
|
): |
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
osp.join(model_name_or_path, "llm"), padding_side="right", use_fast=True, legacy=False |
|
) |
|
if model_max_length is not None: |
|
tokenizer.model_max_length = model_max_length |
|
|
|
|
|
if getattr(config, "chat_template", None) is not None: |
|
print(f"Using chat template: {config.chat_template}") |
|
fpath = os.path.join(os.path.dirname(__file__), "chat_templates", f"{config.chat_template}.jinja") |
|
if not os.path.exists(fpath): |
|
fpath = os.path.join(os.path.dirname(model_name_or_path), f"{config.chat_template}.jinja") |
|
with open(fpath) as fd: |
|
chat_template = fd.read() |
|
tokenizer.chat_template = chat_template.replace(" ", "").replace("\n", "") |
|
|
|
|
|
tokenizer.stop_tokens = infer_stop_tokens(tokenizer) |
|
tokenizer.stop_token_ids = tokenizer.convert_tokens_to_ids(tokenizer.stop_tokens) |
|
|
|
|
|
tokenizer.media_tokens = MEDIA_TOKENS |
|
tokenizer.media_token_ids = {} |
|
for name, token in MEDIA_TOKENS.items(): |
|
tokenizer.add_tokens([token], special_tokens=True) |
|
tokenizer.media_token_ids[name] = tokenizer.convert_tokens_to_ids(token) |
|
|
|
return tokenizer |
|
|
|
|
|
def get_model_config(config): |
|
default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
|
if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
|
root_path = config._name_or_path |
|
else: |
|
root_path = config.resume_path |
|
|
|
|
|
if root_path is not None and not osp.exists(root_path): |
|
try: |
|
valid_hf_repo = repo_exists(root_path) |
|
except HFValidationError as e: |
|
valid_hf_repo = False |
|
if valid_hf_repo: |
|
root_path = snapshot_download(root_path) |
|
|
|
return_list = [] |
|
for key in default_keys: |
|
cfg = getattr(config, key, None) |
|
if isinstance(cfg, dict): |
|
try: |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
except: |
|
raise ValueError(f"Cannot find resume path in config for {key}!") |
|
elif isinstance(cfg, PretrainedConfig): |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
elif isinstance(cfg, str): |
|
return_list.append(cfg) |
|
|
|
return return_list |
|
|
|
|
|
def get_model_config_fp8(config): |
|
default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
|
if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
|
root_path = config._name_or_path |
|
else: |
|
root_path = config.resume_path |
|
|
|
|
|
if root_path is not None and not osp.exists(root_path): |
|
try: |
|
valid_hf_repo = repo_exists(root_path) |
|
except HFValidationError as e: |
|
valid_hf_repo = False |
|
if valid_hf_repo: |
|
root_path = snapshot_download(root_path) |
|
|
|
return_list = [] |
|
for key in default_keys: |
|
cfg = getattr(config, key, None) |
|
if isinstance(cfg, dict): |
|
try: |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
except: |
|
raise ValueError(f"Cannot find resume path in config for {key}!") |
|
elif isinstance(cfg, PretrainedConfig): |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
elif isinstance(cfg, str): |
|
return_list.append(cfg) |
|
|
|
|
|
key = "fp8_llm_cfg" |
|
directory_path = os.path.join(root_path, key[:-4]) |
|
assert os.path.isdir(directory_path) and os.listdir( |
|
directory_path |
|
), "You need to first convert the model weights to FP8 explicitly." |
|
return_list.append(directory_path) |
|
|
|
return return_list |
|
|
|
|
|
def get_model_config_fp8(config): |
|
default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
|
if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
|
root_path = config._name_or_path |
|
else: |
|
root_path = config.resume_path |
|
|
|
|
|
if root_path is not None and not osp.exists(root_path): |
|
try: |
|
valid_hf_repo = repo_exists(root_path) |
|
except HFValidationError as e: |
|
valid_hf_repo = False |
|
if valid_hf_repo: |
|
root_path = snapshot_download(root_path) |
|
|
|
return_list = [] |
|
for key in default_keys: |
|
cfg = getattr(config, key, None) |
|
if isinstance(cfg, dict): |
|
try: |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
except: |
|
raise ValueError(f"Cannot find resume path in config for {key}!") |
|
elif isinstance(cfg, PretrainedConfig): |
|
return_list.append(os.path.join(root_path, key[:-4])) |
|
elif isinstance(cfg, str): |
|
return_list.append(cfg) |
|
|
|
|
|
key = "fp8_llm_cfg" |
|
directory_path = os.path.join(root_path, key[:-4]) |
|
assert os.path.isdir(directory_path) and os.listdir( |
|
directory_path |
|
), "You need to first convert the model weights to FP8 explicitly." |
|
return_list.append(directory_path) |
|
|
|
return return_list |
|
|
|
|
|
def is_mm_model(model_path): |
|
""" |
|
Check if the model at the given path is a visual language model. |
|
|
|
Args: |
|
model_path (str): The path to the model. |
|
|
|
Returns: |
|
bool: True if the model is an MM model, False otherwise. |
|
""" |
|
config = AutoConfig.from_pretrained(model_path) |
|
architectures = config.architectures |
|
for architecture in architectures: |
|
if "llava" in architecture.lower(): |
|
return True |
|
return False |
|
|
|
|
|
def auto_upgrade(config): |
|
cfg = AutoConfig.from_pretrained(config) |
|
if "llava" in config and "llava" not in cfg.model_type: |
|
assert cfg.model_type == "llama" |
|
print("You are using newer LLaVA code base, while the checkpoint of v0 is from older code base.") |
|
print("You must upgrade the checkpoint to the new code base (this can be done automatically).") |
|
confirm = input("Please confirm that you want to upgrade the checkpoint. [Y/N]") |
|
if confirm.lower() in ["y", "yes"]: |
|
print("Upgrading checkpoint...") |
|
assert len(cfg.architectures) == 1 |
|
setattr(cfg.__class__, "model_type", "llava") |
|
cfg.architectures[0] = "LlavaLlamaForCausalLM" |
|
cfg.save_pretrained(config) |
|
print("Checkpoint upgraded.") |
|
else: |
|
print("Checkpoint upgrade aborted.") |
|
exit(1) |
|
|