Spaces:
Running
Running
import gradio as gr | |
import json | |
import posixpath | |
from fastapi import HTTPException, Path, Query, Request | |
from fastapi.responses import StreamingResponse | |
from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
from huggingface_hub import HfApi, HfFileSystem, auth_check | |
from typing import Annotated, Any, NamedTuple | |
from urllib.parse import urlencode | |
from _hf_explorer import FileExplorer | |
from _hf_gguf import standard_metadata, deprecated_metadata, TokenType, LlamaFileType, GGUFValueType, HuggingGGUFstream | |
hfapi = HfApi() | |
class MetadataState(NamedTuple): | |
var: dict[str, Any] | |
key: dict[str, tuple[int, Any]] | |
add: dict[str, Any] | |
rem: set | |
def init_state( | |
): | |
return MetadataState( | |
var = {}, | |
key = {}, | |
add = {}, | |
rem = set(), | |
) | |
def human_readable_metadata( | |
meta: MetadataState, | |
key: str, | |
typ: int, | |
val: Any, | |
) -> tuple[str, str, Any]: | |
typ = GGUFValueType(typ).name | |
if typ == 'ARRAY': | |
val = '[[...], ...]' | |
elif isinstance(val, list): | |
typ = f'[{typ}][{len(val)}]' | |
if len(val) > 8: | |
val = str(val[:8])[:-1] + ', ...]' | |
else: | |
val = str(val) | |
elif isinstance(val, dict): | |
val = '[' + ', '.join((f'{k}: {v}' for k, v in val.items())) + ']' | |
elif key == 'general.file_type': | |
try: | |
ftype = LlamaFileType(val).name | |
except: | |
ftype = 'UNKNOWN' | |
val = f'{ftype} ({val})' | |
elif key.endswith('_token_id'): | |
tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] | |
if isinstance(val, int) and val >= 0 and val < len(tokens): | |
val = f'{tokens[val]} ({val})' | |
return key, typ, val | |
with gr.Blocks( | |
) as blocks: | |
with gr.Tab("Editor"): | |
with gr.Row(): | |
hf_search = HuggingfaceHubSearch( | |
label = "Search Huggingface Hub", | |
placeholder = "Search for models on Huggingface", | |
search_type = "model", | |
sumbit_on_select = True, | |
scale = 2, | |
) | |
hf_branch = gr.Dropdown( | |
None, | |
label = "Branch", | |
scale = 1, | |
) | |
gr.LoginButton( | |
"Sign in to access gated/private repos", | |
scale = 1, | |
) | |
hf_file = FileExplorer( | |
visible=False, | |
) | |
with gr.Row(): | |
with gr.Column(): | |
meta_keys = gr.Dropdown( | |
None, | |
label = "Modify Metadata", | |
info = "Search by metadata key name", | |
allow_custom_value = True, | |
visible = False, | |
) | |
with gr.Column(): | |
meta_types = gr.Dropdown( | |
[e.name for e in GGUFValueType], | |
label = "Metadata Type", | |
info = "Select data type", | |
type = "index", | |
visible = False, | |
) | |
with gr.Column(): | |
btn_delete = gr.Button( | |
"Remove Key", | |
variant = "stop", | |
visible = False, | |
) | |
meta_boolean = gr.Checkbox( | |
label = "Boolean", | |
info = "Click to update value", | |
visible = False, | |
) | |
with gr.Row(): | |
meta_token_select = gr.Dropdown( | |
label = "Select token", | |
info = "Search by token name", | |
type = "index", | |
allow_custom_value = True, | |
visible = False, | |
) | |
meta_token_type = gr.Dropdown( | |
[e.name for e in TokenType], | |
label = "Token type", | |
info = "Select token type", | |
type = "index", | |
visible = False, | |
) | |
meta_lookup = gr.Dropdown( | |
label = "Lookup token", | |
info = "Search by token name", | |
type = "index", | |
allow_custom_value = True, | |
visible = False, | |
) | |
meta_number = gr.Number( | |
label = "Number", | |
info = "Enter to update value", | |
visible = False, | |
) | |
meta_string = gr.Textbox( | |
label = "String", | |
info = "Enter to update value (Shift+Enter for new line)", | |
visible = False, | |
) | |
meta_array = gr.Textbox( | |
None, | |
label = "Unsupported", | |
interactive = False, | |
visible = False, | |
) | |
meta_changes = gr.HighlightedText( | |
None, | |
label = "Metadata Changes", | |
color_map = {"add": "green", "rem": "red"}, | |
interactive = False, | |
visible = False, | |
) | |
btn_download = gr.Button( | |
"Download GGUF", | |
variant = "primary", | |
visible = False, | |
) | |
file_meta = gr.Matrix( | |
None, | |
col_count = (3, "fixed"), | |
headers = [ | |
"Metadata Name", | |
"Type", | |
"Value", | |
], | |
datatype = ["str", "str", "str"], | |
column_widths = ["35%", "15%", "50%"], | |
wrap = False, | |
interactive = False, | |
visible = False, | |
) | |
with gr.Tab("Help"): | |
gr.Markdown( | |
"""# Huggingface GGUF Editor | |
An advanced GGUF editor, reading GGUF files directly from Huggingface repositories and applying changes to your own copies. | |
Below you will find a collection of example use-cases to show you how to perform a few common GGUF editing operations: | |
""", | |
) | |
with gr.Column(render = False) as example_group: | |
example_description = gr.Markdown( | |
visible = False, | |
) | |
with gr.Row(): | |
with gr.Column(): | |
example_keys = gr.Dropdown( | |
allow_custom_value = True, | |
visible = False, | |
) | |
with gr.Column(): | |
example_types = gr.Dropdown( | |
allow_custom_value = True, | |
visible = False, | |
) | |
with gr.Column(): | |
example_delete = gr.Button( | |
interactive = False, | |
visible = False, | |
) | |
example_boolean = gr.Checkbox( | |
visible = False, | |
) | |
with gr.Row(): | |
example_token_select = gr.Dropdown( | |
allow_custom_value = True, | |
visible = False, | |
) | |
example_token_type = gr.Dropdown( | |
allow_custom_value = True, | |
visible = False, | |
) | |
example_number = gr.Number( | |
visible = False, | |
) | |
example_string = gr.Textbox( | |
visible = False, | |
) | |
example_components = [ | |
example_description, | |
example_keys, | |
example_types, | |
example_delete, | |
example_boolean, | |
example_token_select, | |
example_token_type, | |
example_number, | |
example_string, | |
] | |
example_defaults = { | |
example_description: dict( | |
value = "", | |
visible = False, | |
), | |
example_keys: dict( | |
value = "", | |
label = meta_keys.label, | |
info = "Select this metadata key", | |
visible = False, | |
), | |
example_types: dict( | |
value = "", | |
label = meta_types.label, | |
info = "This will have the correct type set automatically", | |
visible = False, | |
), | |
example_delete: dict( | |
value = btn_delete.value, | |
variant = btn_delete.variant, | |
visible = False, | |
), | |
example_boolean: dict( | |
value = False, | |
label = meta_boolean.label, | |
info = "", | |
visible = False, | |
), | |
example_token_select: dict( | |
value = "", | |
label = meta_token_select.label, | |
visible = False, | |
), | |
example_token_type: dict( | |
value = "", | |
label = meta_token_type.label, | |
visible = False, | |
), | |
example_number: dict( | |
value = 0, | |
precision = 0, | |
label = meta_number.label, | |
info = "", | |
visible = False, | |
), | |
example_string: dict( | |
value = "", | |
label = meta_string.label, | |
info = "", | |
visible = False, | |
), | |
} | |
example_properties = [ | |
dict( | |
label = 'Fix "missing pre-tokenizer type" warning', | |
outputs = { | |
example_description: dict( | |
value = """## Fixing Pre-Tokenizer warning | |
Custom Pre-Tokenization was added to `llama.cpp` April 29th 2024, and since then basically every model using BPE tokenization need support added to `llama.cpp` to work correctly. | |
Models converted using the conversion script before the support for this specific model was added will either be missing the pre-tokenizer metadata or be set incorrectly to `default`. | |
See the models list in [llama.cpp/convert_hf_to_gguf_update.py](https://github.com/ggerganov/llama.cpp/blob/master/convert_hf_to_gguf_update.py#L67) to find out which pre-tokenizer to choose. | |
Setting the correct pre-tokenizer is often enough to fix the model's tokenizer, however if it has been quantized using an `imatrix` it should be re-quantized for best performance. | |
Removing this metadata key from a model will cause `llama.cpp` to output a warning if BPE tokenization is used, it currently has no effect on any other tokenizers. | |
""", | |
visible = True, | |
), | |
example_keys: dict( | |
value = "tokenizer.ggml.pre", | |
visible = True, | |
), | |
example_types: dict( | |
value = GGUFValueType.STRING.name, | |
visible = True, | |
), | |
example_delete: dict( | |
visible = True, | |
), | |
example_string: dict( | |
info = "Fill in pre-tokenizer name, can be f.ex. deepseek-llm, command-r, tekken, etc. you will need to do some research to find the correct one", | |
value = "llama-bpe", | |
visible = True, | |
), | |
}, | |
), | |
dict( | |
label = "Add missing (Fill-in-Middle, EOT, etc) or change incorrect (BOS, EOS, etc) tokens", | |
outputs = { | |
example_description: dict( | |
value = """## Add missing/change incorrect tokens | |
Sometimes converted models will be missing declarations of important tokens like EOT, Fill-in-Middle (fim_pre, fim_suf, fim_mid, fim_pad, fim_rep, fim_sep) for various reasons. | |
Other times they may have the incorrect tokens set as BOS, EOS, etc. Either way, missing or incorrectly declared tokens means inference will not work as expected. | |
Token declaration is made with the metadata key(s) named "tokenizer.ggml.`token name`\_token\_id" which contains the ID (index number) of the token in the token list (`tokenizer.ggml.tokens`). | |
A recurring issue is misconfigured EOS/EOT/EOM tokens, the need to set each of these and what they should be will vary between models, but the effect when these are incorrect is usually the same; | |
infinte generation responses, ie. inference does not know when to stop. Typically this would be because f.ex. EOS has been set to <|endoftext|> instead of <|im\_end|> (again, model specific, just an example). | |
Another issue, mainly for code models, is that Fill-in-Middle tokens have not been declared and not auto-detected (note; not all models have or use such tokens), causing sub-par results for filling in blanks in code/text. | |
There are 3 main metadata keys that need to be present for this; tokenizer.ggml.`fim_pre`\_token\_id, `fim_suf` and `fim_mid`, and 3 auxiliary ones; `fim_pad`, `fim_rep` and `fim_sep`, sometimes also EOT/EOM if it differs from EOS in this mode. | |
They are usually named fim\_`something` or just `PRE`, `SUF` and `MID`, take extra care with DeepSeek-based models where fim_pre is (...fim...)`begin`, fim_suf is `hole` and fim_mid is `end`. | |
""", | |
visible = True, | |
), | |
example_keys: dict( | |
value = "tokenizer.ggml.fim_pre_token_id", | |
info = "Select or enter any metadata key ending with _token_id", | |
visible = True, | |
), | |
example_types: dict( | |
value = GGUFValueType.UINT32.name, | |
visible = True, | |
), | |
example_token_select: dict( | |
value = "<fim_prefix>", | |
label = meta_lookup.label, | |
info = "You can search for the correct token by parts of its name here, then select the correct one from the list of options", | |
visible = True, | |
), | |
example_number: dict( | |
value = 92295, | |
info = "The token ID will be automatically filled in when you select the token, but you can also fill in the ID directly", | |
visible = True, | |
), | |
}, | |
), | |
dict( | |
label = "Setting the correct token type for a token", | |
outputs = { | |
example_description: dict( | |
value = """## Changing a token's type | |
A common issue is not declaring special control tokens as such, leading to bad tokenization of them when used (usually in the chat template), causing poor responses from the model. | |
Take f.ex. a model with an incorrectly configured <|im\_start|> token as a normal token instead of a special control token, given the following prompt: | |
``` | |
<|im_start|>Hello World<|im_end|> | |
``` | |
This prompt would then be incorrectly tokenized as follows: | |
``` | |
27 ('<') | |
91 ('|') | |
318 ('im') | |
4906 ('_start') | |
91 ('|') | |
29 ('>') | |
9707 ('Hello') | |
4337 (' World') | |
151645 ('<|im_end|>') | |
``` | |
instead of: | |
``` | |
151644 ('<|im_start|>') | |
9707 ('Hello') | |
4337 (' World') | |
151645 ('<|im_end|>') | |
``` | |
Take care to also adjust the value for this token in `tokenizer.ggml.scores` (if it exists) similarly to other special control tokens. | |
**WARNING**: Even though you have the option to, you should never remove the `tokenizer.ggml.token_type` key! | |
""", | |
visible = True, | |
), | |
example_keys: dict( | |
value = "tokenizer.ggml.token_type", | |
visible = True, | |
), | |
example_types: dict( | |
value = GGUFValueType.INT32.name, | |
visible = True, | |
), | |
example_delete: dict( | |
visible = True, | |
), | |
example_token_select: dict( | |
value = "<|im_start|>", | |
info = "You can search for the token by parts of its name here, then select it from the list of options", | |
visible = True, | |
), | |
example_token_type: dict( | |
value = TokenType.CONTROL.name, | |
info = "Select the appropriate token type, in this case we set it as a special control token", | |
visible = True, | |
), | |
}, | |
), | |
dict( | |
label = "Updating or adding a chat template", | |
outputs = { | |
example_description: dict( | |
value = """## Modifying the Chat Template | |
The chat template is a very important part of the model metadata as this provides a template for how to format the conversation prompt to the model. | |
It's not uncommon for these to have bugs (or sometimes just be plain wrong), requiring you to update them to be able to prompt the model correctly. | |
It's also possible to have multiple chat templates for different purposes, the main ones being RAG and Tools, but you can create any additional template you want. | |
The standard metadata key for RAG is `tokenizer.chat_template.rag` and Tools is `tokenizer.chat_template.tool_use`, any metadata key added starting with `tokenizer.chat_template.` will be added as a custom chat template. | |
Any framework based on `llama-cpp-python` will let you select which chat template to use with the `chat_format` option, available as `chat_template.default`, `chat_template.rag`, `chat_template.tool_use`, etc... | |
""", | |
visible = True, | |
), | |
example_keys: dict( | |
value = "tokenizer.chat_template", | |
info = 'Select this or enter any key starting with "tokenizer.chat_template."', | |
visible = True, | |
), | |
example_types: dict( | |
value = GGUFValueType.STRING.name, | |
visible = True, | |
), | |
example_delete: dict( | |
visible = True, | |
), | |
example_string: dict( | |
info = "Paste in the updated chat template or make changes here. Using an external Jinja2 editor is recommended", | |
value = "{%- for message in messages %}\n {{- '<|' + message['role'] + '|>\\n' }}\n {{- message['content'] + eos_token }}\n{%- endfor %}\n{%- if add_generation_prompt %}\n {{- '<|assistant|>\\n' }}\n{%- endif %}", | |
visible = True, | |
), | |
}, | |
), | |
] | |
examples = gr.Dataset( | |
label = "Choose an example", | |
type = "index", | |
samples = [[]] * len(example_properties), | |
sample_labels = [x["label"] for x in example_properties], | |
) | |
def show_example( | |
value: int, | |
): | |
outputs = example_properties[value]["outputs"] | |
non_outputs = example_components - outputs.keys() | |
all_outputs = dict(((k, type(k)(**(example_defaults[k] | v))) for k, v in outputs.items())) | |
for output in non_outputs: | |
all_outputs[output] = type(output)(**example_defaults[output]) | |
return all_outputs | |
for k, v in example_defaults.items(): | |
for prop, val in v.items(): | |
setattr(k, prop, val) | |
example_group.render() | |
meta_state = gr.State() # init_state | |
# BUG: For some reason using gr.State initial value turns tuple to list? | |
meta_state.value = init_state() | |
token_select_indices = gr.State([]) | |
file_change_components = [ | |
meta_changes, | |
file_meta, | |
meta_keys, | |
btn_download, | |
] | |
state_change_components = [ | |
meta_state, | |
] + file_change_components | |
def get_branches( | |
repo: str, | |
oauth_token: gr.OAuthToken | None = None, | |
): | |
branches = [] | |
try: | |
refs = hfapi.list_repo_refs( | |
repo, | |
token = oauth_token.token if oauth_token else False, | |
) | |
branches = [b.name for b in refs.branches] | |
except Exception as e: | |
pass | |
return { | |
hf_branch: gr.Dropdown( | |
branches or None, | |
value = "main" if "main" in branches else None, | |
), | |
} | |
def get_files( | |
repo: str, | |
branch: str | None, | |
oauth_token: gr.OAuthToken | None = None, | |
): | |
try: | |
auth_check(repo, token=oauth_token.token if oauth_token else False) | |
except Exception as e: | |
gr.Warning(str(e)) | |
return { | |
hf_file: FileExplorer( | |
root_dir = None, | |
visible = False, | |
), | |
meta_changes: gr.HighlightedText( | |
visible = False, | |
), | |
file_meta: gr.Matrix( | |
visible = False, | |
), | |
meta_keys: gr.Dropdown( | |
visible = False, | |
), | |
btn_download: gr.Button( | |
visible = False, | |
), | |
meta_types: gr.Dropdown( | |
visible = False, | |
), | |
btn_delete: gr.Button( | |
visible = False, | |
), | |
meta_boolean: gr.Checkbox( | |
visible = False, | |
), | |
meta_token_select: gr.Dropdown( | |
visible = False, | |
), | |
meta_token_type: gr.Dropdown( | |
visible = False, | |
), | |
meta_lookup: gr.Dropdown( | |
visible = False, | |
), | |
meta_number: gr.Number( | |
visible = False, | |
), | |
meta_string: gr.Textbox( | |
visible = False, | |
), | |
meta_array: gr.Textbox( | |
visible = False, | |
), | |
} | |
return { | |
hf_file: FileExplorer( | |
"**/*.gguf", | |
file_count = "single", | |
root_dir = repo, | |
branch = branch, | |
token = oauth_token.token if oauth_token else False, | |
visible = True, | |
), | |
meta_changes: gr.HighlightedText( | |
None, | |
visible = False, | |
), | |
file_meta: gr.Matrix( | |
None, | |
visible = False, | |
), | |
meta_keys: gr.Dropdown( | |
None, | |
visible = False, | |
), | |
btn_download: gr.Button( | |
visible = False, | |
), | |
meta_types: gr.Dropdown( | |
visible = False, | |
), | |
btn_delete: gr.Button( | |
visible = False, | |
), | |
meta_boolean: gr.Checkbox( | |
visible = False, | |
), | |
meta_token_select: gr.Dropdown( | |
visible = False, | |
), | |
meta_token_type: gr.Dropdown( | |
visible = False, | |
), | |
meta_lookup: gr.Dropdown( | |
visible = False, | |
), | |
meta_number: gr.Number( | |
visible = False, | |
), | |
meta_string: gr.Textbox( | |
visible = False, | |
), | |
meta_array: gr.Textbox( | |
visible = False, | |
), | |
} | |
def load_metadata( | |
repo_file: str | None, | |
branch: str | None, | |
progress: gr.Progress = gr.Progress(), | |
oauth_token: gr.OAuthToken | None = None, | |
): | |
m = [] | |
meta = init_state() | |
yield { | |
meta_state: meta, | |
file_meta: gr.Matrix( | |
None, | |
visible = True, | |
), | |
meta_changes: gr.HighlightedText( | |
None, | |
visible = False, | |
), | |
meta_keys: gr.Dropdown( | |
None, | |
visible = False, | |
), | |
btn_download: gr.Button( | |
visible = False, | |
), | |
meta_types: gr.Dropdown( | |
visible = False, | |
), | |
btn_delete: gr.Button( | |
visible = False, | |
), | |
meta_boolean: gr.Checkbox( | |
visible = False, | |
), | |
meta_token_select: gr.Dropdown( | |
visible = False, | |
), | |
meta_token_type: gr.Dropdown( | |
visible = False, | |
), | |
meta_lookup: gr.Dropdown( | |
visible = False, | |
), | |
meta_number: gr.Number( | |
visible = False, | |
), | |
meta_string: gr.Textbox( | |
visible = False, | |
), | |
meta_array: gr.Textbox( | |
visible = False, | |
), | |
} | |
if not repo_file: | |
return | |
fs = HfFileSystem( | |
token = oauth_token.token if oauth_token else None, | |
) | |
try: | |
progress(0, desc = 'Loading file...') | |
with fs.open( | |
repo_file, | |
"rb", | |
revision = branch, | |
block_size = 8 * 1024 * 1024, | |
cache_type = "readahead", | |
) as fp: | |
progress(0, desc = 'Reading header...') | |
gguf = HuggingGGUFstream(fp) | |
num_metadata = gguf.header['metadata'].value | |
metadata = gguf.read_metadata() | |
meta.var['repo_file'] = repo_file | |
meta.var['branch'] = branch | |
deferred_updates = [] | |
for k, v in progress.tqdm(metadata, desc = 'Reading metadata...', total = num_metadata, unit = f' of {num_metadata} metadata keys...'): | |
human = [*human_readable_metadata(meta, k, v.type, v.value)] | |
if k.endswith('_token_id') and 'tokenizer.ggml.tokens' not in meta.key: | |
deferred_updates.append(((k, v.type, v.value), human)) | |
m.append(human) | |
meta.key[k] = (v.type, v.value) | |
yield { | |
file_meta: gr.Matrix( | |
m, | |
), | |
} | |
for data, human in deferred_updates: | |
human[:] = human_readable_metadata(meta, *data) | |
except Exception as e: | |
raise gr.Error(e) | |
yield { | |
meta_state: meta, | |
meta_keys: gr.Dropdown( | |
sorted(meta.key.keys() | standard_metadata.keys()), | |
value = '', | |
visible = True, | |
), | |
} | |
def update_metakey( | |
meta: MetadataState, | |
key: str | None, | |
): | |
typ = None | |
if (val := meta.key.get(key, standard_metadata.get(key))) is not None: | |
typ = GGUFValueType(val[0]).name | |
elif key: | |
if key.startswith('tokenizer.chat_template.'): | |
typ = GGUFValueType.STRING.name | |
elif key.endswith('_token_id'): | |
typ = GGUFValueType.UINT32.name | |
return { | |
meta_keys: gr.Dropdown( | |
info = "DEPRECATED" if key in deprecated_metadata else "Search by metadata key name", | |
), | |
meta_types: gr.Dropdown( | |
value = typ, | |
interactive = False if typ is not None else True, | |
visible = True if key else False, | |
), | |
btn_delete: gr.Button( | |
visible = True if key in meta.key else False, | |
), | |
} | |
def update_metatype( | |
meta: MetadataState, | |
key: str, | |
typ: int, | |
): | |
val = None | |
tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] | |
if (data := meta.key.get(key, standard_metadata.get(key))) is not None: | |
typ = data[0] | |
val = data[1] | |
elif not key: | |
typ = None | |
do_select_token = False | |
do_lookup_token = False | |
do_token_type = False | |
match key: | |
case 'tokenizer.ggml.scores': | |
do_select_token = True | |
case 'tokenizer.ggml.token_type': | |
do_select_token = True | |
do_token_type = True | |
case s if s.endswith('_token_id'): | |
do_lookup_token = True | |
case _: | |
pass | |
if isinstance(val, list) and not do_select_token: | |
# TODO: Support arrays? | |
typ = GGUFValueType.ARRAY | |
match typ: | |
case GGUFValueType.INT8 | GGUFValueType.INT16 | GGUFValueType.INT32 | GGUFValueType.INT64 | GGUFValueType.UINT8 | GGUFValueType.UINT16 | GGUFValueType.UINT32 | GGUFValueType.UINT64 | GGUFValueType.FLOAT32 | GGUFValueType.FLOAT64: | |
is_number = True | |
case _: | |
is_number = False | |
return { | |
meta_boolean: gr.Checkbox( | |
value = val if typ == GGUFValueType.BOOL and data is not None else False, | |
visible = True if typ == GGUFValueType.BOOL else False, | |
), | |
meta_token_select: gr.Dropdown( | |
None, | |
value = '', | |
visible = True if do_select_token else False, | |
), | |
meta_token_type: gr.Dropdown( | |
interactive = False, | |
visible = True if do_token_type else False, | |
), | |
meta_lookup: gr.Dropdown( | |
None, | |
value = tokens[val] if is_number and data is not None and do_lookup_token and val < len(tokens) else '', | |
visible = True if is_number and do_lookup_token else False, | |
), | |
meta_number: gr.Number( | |
value = val if is_number and data is not None and not do_select_token else 0, | |
precision = 10 if typ == GGUFValueType.FLOAT32 or typ == GGUFValueType.FLOAT64 else 0, | |
interactive = False if do_select_token else True, | |
visible = True if is_number and not do_token_type else False, | |
), | |
meta_string: gr.Textbox( | |
value = val if typ == GGUFValueType.STRING else '', | |
visible = True if typ == GGUFValueType.STRING else False, | |
), | |
meta_array: gr.Textbox( | |
visible = True if typ == GGUFValueType.ARRAY else False, | |
), | |
} | |
def select_metakey( | |
evt: gr.SelectData, | |
): | |
return { | |
meta_keys: gr.Dropdown( | |
value = evt.row_value[0] if evt.selected else '', | |
), | |
} | |
def notify_state_change( | |
meta: MetadataState, | |
request: gr.Request, | |
): | |
changes = [(k, 'rem') for k in meta.rem] | |
for k, v in meta.add.items(): | |
key, typ, val = human_readable_metadata(meta, k, *v) | |
changes.append((k, 'add')) | |
changes.append((str(val), None)) | |
m = [] | |
for k, v in meta.key.items(): | |
m.append([*human_readable_metadata(meta, k, v[0], v[1])]) | |
link = str(request.request.url_for('download', repo_file = meta.var['repo_file']).include_query_params(branch = meta.var['branch'], session = request.session_hash, state = str(meta_state._id))) | |
if link.startswith('http:'): | |
link = 'https' + link[4:] | |
# if meta.rem or meta.add: | |
# link += '&' + urlencode( | |
# { | |
# 'rem': meta.rem, | |
# 'add': [json.dumps([k, *v], ensure_ascii = False, separators = (',', ':')) for k, v in meta.add.items()], | |
# }, | |
# doseq = True, | |
# safe = '[]{}:"\',', | |
# ) | |
return { | |
meta_state: meta, | |
meta_changes: gr.HighlightedText( | |
changes, | |
visible = True if changes else False, | |
), | |
file_meta: gr.Matrix( | |
m, | |
), | |
meta_keys: gr.Dropdown( | |
sorted(meta.key.keys() | standard_metadata.keys()), | |
value = '', | |
), | |
btn_download: gr.Button( | |
link = link, | |
visible = True if changes else False, | |
), | |
} | |
def rem_metadata( | |
meta: MetadataState, | |
key: str, | |
request: gr.Request, | |
): | |
if key in meta.add: | |
del meta.add[key] | |
if key in meta.key: | |
del meta.key[key] | |
meta.rem.add(key) | |
return notify_state_change( | |
meta, | |
request, | |
) | |
def token_search( | |
meta: MetadataState, | |
name: str, | |
): | |
found = {} | |
name = name.lower() | |
tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] | |
any(((len(found) > 5, found.setdefault(i, t))[0] for i, t in enumerate(tokens) if name in t.lower())) | |
return found | |
def token_select( | |
meta: MetadataState, | |
keyup: gr.KeyUpData, | |
): | |
found = token_search(meta, keyup.input_value) | |
return { | |
meta_token_select: gr.Dropdown( | |
list(found.values()), | |
), | |
token_select_indices: list(found.keys()), | |
} | |
def token_selected( | |
meta: MetadataState, | |
key: str, | |
choice: int | None, | |
indices: list[int], | |
): | |
if choice is None or choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: | |
raise gr.Error('Token not found') | |
tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] | |
if token >= len(tokens): | |
raise gr.Error('Invalid token') | |
data = meta.key.get(key, (-1, []))[1] | |
match key: | |
case 'tokenizer.ggml.scores': | |
return { | |
meta_number: gr.Number( | |
value = data[token] if data and len(data) > token else 0.0, | |
interactive = True, | |
), | |
} | |
case 'tokenizer.ggml.token_type': | |
return { | |
meta_token_type: gr.Dropdown( | |
value = TokenType(data[token]).name if data and len(data) > token else TokenType.NORMAL.name, | |
interactive = True, | |
), | |
} | |
case _: | |
raise gr.Error('Invalid metadata key') | |
def token_lookup( | |
meta: MetadataState, | |
keyup: gr.KeyUpData, | |
): | |
found = token_search(meta, keyup.input_value) | |
return { | |
meta_lookup: gr.Dropdown( | |
list(found.values()), | |
), | |
token_select_indices: list(found.keys()), | |
} | |
def add_metadata( | |
meta: MetadataState, | |
key: str, | |
typ: int | None, | |
val: Any, | |
request: gr.Request, | |
choice: int | None = None, | |
indices: list[int] | None = None, | |
): | |
if not key or typ is None: | |
if key: | |
gr.Warning('Missing required value type') | |
return { | |
meta_changes: gr.HighlightedText( | |
), | |
} | |
if key in meta.rem: | |
meta.rem.remove(key) | |
match key: | |
case 'tokenizer.ggml.scores' | 'tokenizer.ggml.token_type': | |
if choice is None or choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: | |
raise gr.Error('Token not found') | |
tok = meta.add.setdefault(key, (typ, {}))[1] | |
tok[str(token)] = val + 1 if key == 'tokenizer.ggml.token_type' else val | |
data = meta.key.setdefault(key, (typ, [0.0 if key == 'tokenizer.ggml.scores' else int(TokenType.NORMAL)] * len(meta.key.get('tokenizer.ggml.tokens', (-1, []))[1])))[1] | |
if data: | |
for k, v in tok.items(): | |
data[int(k)] = v | |
case _: | |
meta.key[key] = meta.add[key] = (typ, val) | |
if key.startswith('tokenizer.chat_template.'): | |
template = key[24:] | |
if template not in meta.key.get('tokenizer.chat_templates', []): | |
templates = [x[24:] for x in meta.key.keys() if x.startswith('tokenizer.chat_template.')] | |
meta.key['tokenizer.chat_templates'] = meta.add['tokenizer.chat_templates'] = (GGUFValueType.STRING, templates) | |
return notify_state_change( | |
meta, | |
request, | |
) | |
def token_select_to_id( | |
choice: int, | |
indices: list[int], | |
): | |
if choice is None or choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: | |
raise gr.Error('Token not found') | |
return { | |
meta_number: gr.Number( | |
token, | |
), | |
} | |
meta_lookup.input( | |
token_select_to_id, | |
inputs = [ | |
meta_lookup, | |
token_select_indices, | |
], | |
outputs = [ | |
meta_number, | |
], | |
).success( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_number, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
meta_boolean.input( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_boolean, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
meta_token_type.input( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_token_type, | |
meta_token_select, | |
token_select_indices, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
meta_number.submit( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_number, | |
meta_token_select, | |
token_select_indices, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
meta_string.submit( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_string, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
meta_array.input( | |
add_metadata, | |
inputs = [ | |
meta_state, | |
meta_keys, | |
meta_types, | |
meta_array, | |
], | |
outputs = [ | |
] + state_change_components, | |
) | |
def stream_repo_file( | |
repo_file: str, | |
branch: str, | |
add_meta: list[Any] | None, | |
rem_meta: list[str] | None, | |
token: str | None = None, | |
): | |
fs = HfFileSystem( | |
token = token, | |
) | |
with fs.open( | |
repo_file, | |
"rb", | |
revision = branch, | |
block_size = 8 * 1024 * 1024, | |
cache_type = "readahead", | |
) as fp: | |
if not rem_meta: | |
rem_meta = [] | |
if not add_meta: | |
add_meta = [] | |
gguf = HuggingGGUFstream(fp) | |
for _ in gguf.read_metadata(): | |
pass | |
for k in rem_meta: | |
gguf.remove_metadata(k) | |
tokens = gguf.metadata.get('tokenizer.ggml.tokens') | |
for k in add_meta: | |
if isinstance(k, list) and len(k) == 3: | |
if isinstance(k[2], dict): | |
if tokens: | |
if (data := gguf.metadata.get(k[0])): | |
data = data.value | |
else: | |
data = [0.0 if k[0] == 'tokenizer.ggml.scores' else int(TokenType.NORMAL)] * len(tokens.value) | |
for i, v in k[2].items(): | |
data[int(i)] = v | |
k[2] = data | |
else: | |
k[2] = [] | |
gguf.add_metadata(*k) | |
gguf.adjust_padding() | |
yield gguf.filesize | |
yield b''.join((v.data for k, v in gguf.header.items())) | |
for k, v in gguf.metadata.items(): | |
yield v.data | |
while True: | |
if not (data := fp.read(65536)): | |
break | |
yield data | |
if __name__ == "__main__": | |
blocks.queue( | |
max_size = 10, | |
default_concurrency_limit = 10, | |
) | |
app, local_url, share_url = blocks.launch( | |
show_api = False, | |
prevent_thread_lock = True, | |
) | |
async def download( | |
request: Request, | |
repo_file: Annotated[str, Path()], | |
branch: Annotated[str, Query()] = "main", | |
add: Annotated[list[str] | None, Query()] = None, | |
rem: Annotated[list[str] | None, Query()] = None, | |
session: Annotated[str | None, Query()] = None, | |
state: Annotated[int | None, Query()] = None, | |
): | |
token = request.session.get('oauth_info', {}).get('access_token') | |
if posixpath.normpath(repo_file) != repo_file or '\\' in repo_file or repo_file.startswith('../') or repo_file.startswith('/') or repo_file.count('/') < 2: | |
raise HTTPException( | |
status_code = 404, | |
detail = 'Invalid repository', | |
) | |
if session and state is not None and session in request.app.state_holder and state in request.app.state_holder[session]: | |
meta: MetadataState = request.app.state_holder[session][state] | |
if meta.rem: | |
rem = list(meta.rem) | |
if meta.add: | |
add = [[k, *v] for k, v in meta.add.items()] | |
elif add: | |
add = [json.loads(a) for a in add] | |
stream = stream_repo_file( | |
repo_file, | |
branch, | |
add, | |
rem, | |
token = token, | |
) | |
size = next(stream) | |
return StreamingResponse( | |
stream, | |
headers = { | |
'Content-Length': str(size), | |
}, | |
media_type = 'application/octet-stream', | |
) | |
app.add_api_route( | |
"/download/{repo_file:path}", | |
download, | |
methods = ["GET"], | |
) | |
# app.openapi_schema = None | |
# app.setup() | |
blocks.block_thread() | |