|
import os |
|
import subprocess |
|
from typing import Union |
|
is_spaces = True if os.environ.get("SPACE_ID") else False |
|
|
|
if is_spaces: |
|
import spaces |
|
subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) |
|
|
|
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" |
|
import sys |
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
|
|
sys.path.insert(0, os.getcwd()) |
|
|
|
import gradio as gr |
|
from PIL import Image |
|
import torch |
|
import uuid |
|
import os |
|
import shutil |
|
import json |
|
import yaml |
|
from slugify import slugify |
|
from transformers import AutoProcessor, AutoModelForCausalLM |
|
|
|
if not is_spaces: |
|
from toolkit.job import get_job |
|
gr.OAuthProfile = None |
|
gr.OAuthToken = None |
|
|
|
MAX_IMAGES = 150 |
|
|
|
|
|
def load_captioning(uploaded_images, concept_sentence): |
|
updates = [] |
|
if len(uploaded_images) <= 1: |
|
raise gr.Error( |
|
"Please upload at least 2 images to train your model (the ideal number with default settings is between 4-30)" |
|
) |
|
elif len(uploaded_images) > MAX_IMAGES: |
|
raise gr.Error(f"For now, only {MAX_IMAGES} or less images are allowed for training") |
|
|
|
|
|
updates.append(gr.update(visible=True)) |
|
|
|
for i in range(1, MAX_IMAGES + 1): |
|
|
|
visible = i <= len(uploaded_images) |
|
|
|
|
|
updates.append(gr.update(visible=visible)) |
|
|
|
|
|
image_value = uploaded_images[i - 1] if visible else None |
|
|
|
updates.append(gr.update(value=image_value, visible=visible)) |
|
|
|
|
|
text_value = "[trigger]" if visible and concept_sentence else None |
|
updates.append(gr.update(value=text_value, visible=visible)) |
|
|
|
|
|
updates.append(gr.update(visible=True)) |
|
updates.append(gr.update(placeholder=f'A photo of {concept_sentence} holding a sign that reads "Hello friend"')) |
|
updates.append(gr.update(placeholder=f"A mountainous landscape in the style of {concept_sentence}")) |
|
updates.append(gr.update(placeholder=f"A {concept_sentence} in a mall")) |
|
return updates |
|
|
|
def create_dataset(*inputs): |
|
print("Creating dataset") |
|
images = inputs[0] |
|
destination_folder = str(f"datasets/{uuid.uuid4()}") |
|
if not os.path.exists(destination_folder): |
|
os.makedirs(destination_folder) |
|
|
|
jsonl_file_path = os.path.join(destination_folder, "metadata.jsonl") |
|
with open(jsonl_file_path, "a") as jsonl_file: |
|
for index, image in enumerate(images): |
|
new_image_path = shutil.copy(image, destination_folder) |
|
|
|
original_caption = inputs[index + 1] |
|
file_name = os.path.basename(new_image_path) |
|
|
|
data = {"file_name": file_name, "prompt": original_caption} |
|
|
|
jsonl_file.write(json.dumps(data) + "\n") |
|
|
|
return destination_folder |
|
|
|
|
|
def run_captioning(images, concept_sentence, *captions): |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
torch_dtype = torch.float16 |
|
model = AutoModelForCausalLM.from_pretrained( |
|
"microsoft/Florence-2-large", torch_dtype=torch_dtype, trust_remote_code=True |
|
).to(device) |
|
processor = AutoProcessor.from_pretrained("microsoft/Florence-2-large", trust_remote_code=True) |
|
|
|
captions = list(captions) |
|
for i, image_path in enumerate(images): |
|
print(captions[i]) |
|
if isinstance(image_path, str): |
|
image = Image.open(image_path).convert("RGB") |
|
|
|
prompt = "<DETAILED_CAPTION>" |
|
inputs = processor(text=prompt, images=image, return_tensors="pt").to(device, torch_dtype) |
|
|
|
generated_ids = model.generate( |
|
input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=1024, num_beams=3 |
|
) |
|
|
|
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] |
|
parsed_answer = processor.post_process_generation( |
|
generated_text, task=prompt, image_size=(image.width, image.height) |
|
) |
|
caption_text = parsed_answer["<DETAILED_CAPTION>"].replace("The image shows ", "") |
|
if concept_sentence: |
|
caption_text = f"{caption_text} [trigger]" |
|
captions[i] = caption_text |
|
|
|
yield captions |
|
model.to("cpu") |
|
del model |
|
del processor |
|
|
|
if is_spaces: |
|
run_captioning = spaces.GPU()(run_captioning) |
|
|
|
def start_training( |
|
profile: Union[gr.OAuthProfile, None], |
|
oauth_token: Union[gr.OAuthToken, None], |
|
lora_name, |
|
concept_sentence, |
|
steps, |
|
lr, |
|
rank, |
|
dataset_folder, |
|
sample_1, |
|
sample_2, |
|
sample_3, |
|
): |
|
if not lora_name: |
|
raise gr.Error("You forgot to insert your LoRA name! This name has to be unique.") |
|
print("Started training") |
|
slugged_lora_name = slugify(lora_name) |
|
|
|
|
|
with open("train_lora_flux_24gb.yaml" if is_spaces else "config/examples/train_lora_flux_24gb.yaml", "r") as f: |
|
config = yaml.safe_load(f) |
|
|
|
|
|
config["config"]["name"] = slugged_lora_name |
|
config["config"]["process"][0]["model"]["low_vram"] = True |
|
config["config"]["process"][0]["train"]["skip_first_sample"] = True |
|
config["config"]["process"][0]["train"]["steps"] = int(steps) |
|
config["config"]["process"][0]["train"]["lr"] = float(lr) |
|
config["config"]["process"][0]["network"]["linear"] = int(rank) |
|
config["config"]["process"][0]["network"]["linear_alpha"] = int(rank) |
|
config["config"]["process"][0]["datasets"][0]["folder_path"] = dataset_folder |
|
config["config"]["process"][0]["save"]["push_to_hub"] = True |
|
config["config"]["process"][0]["save"]["hf_repo_id"] = f"{profile.username}/{slugged_lora_name}" |
|
config["config"]["process"][0]["save"]["hf_private"] = True |
|
if concept_sentence: |
|
config["config"]["process"][0]["trigger_word"] = concept_sentence |
|
if sample_1 or sample_2 or sample_2: |
|
config["config"]["process"][0]["train"]["disable_sampling"] = False |
|
config["config"]["process"][0]["sample"]["sample_every"] = steps |
|
config["config"]["process"][0]["sample"]["prompts"] = [] |
|
if sample_1: |
|
config["config"]["process"][0]["sample"]["prompts"].append(sample_1) |
|
if sample_2: |
|
config["config"]["process"][0]["sample"]["prompts"].append(sample_2) |
|
if sample_3: |
|
config["config"]["process"][0]["sample"]["prompts"].append(sample_3) |
|
else: |
|
config["config"]["process"][0]["train"]["disable_sampling"] = True |
|
|
|
|
|
random_config_name = str(uuid.uuid4()) |
|
config_path = f"/tmp/{random_config_name}-{slugged_lora_name}.yaml" |
|
with open(config_path, "w") as f: |
|
yaml.dump(config, f) |
|
if is_spaces: |
|
print("Started training with spacerunner...") |
|
|
|
shutil.copy(config_path, dataset_folder + "/config.yaml") |
|
|
|
script_location = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
shutil.copy(script_location + "/script.py", dataset_folder) |
|
|
|
shutil.copy(script_location + "/requirements.autotrain", dataset_folder + "/requirements.txt") |
|
|
|
cmd = f"autotrain spacerunner --project-name {slugged_lora_name} --script-path {dataset_folder}" |
|
cmd += f" --username {profile.username} --token {oauth_token.token} --backend spaces-l4x1" |
|
outcome = subprocess.run(cmd.split()) |
|
if outcome.returncode == 0: |
|
return f"""# Your training has started. |
|
## - Training Status: <a href='https://huggingface.co/spaces/{profile.username}/autotrain-{slugged_lora_name}?logs=container'>{profile.username}/autotrain-{slugged_lora_name}</a> <small>(in the logs tab)</small> |
|
## - Model page: <a href='https://huggingface.co/{profile.username}/{slugged_lora_name}'>{profile.username}/{slugged_lora_name}</a> <small>(will be available when training finishes)</small>""" |
|
else: |
|
print("Error: ", outcome.stderr) |
|
raise gr.Error("Something went wrong. Make sure the name of your LoRA is unique and try again") |
|
else: |
|
|
|
job = get_job(config_path) |
|
job.run() |
|
job.cleanup() |
|
|
|
return f"Training completed successfully. Model saved as {slugged_lora_name}" |
|
|
|
|
|
theme = gr.themes.Monochrome( |
|
text_size=gr.themes.Size(lg="18px", md="15px", sm="13px", xl="22px", xs="12px", xxl="24px", xxs="9px"), |
|
font=[gr.themes.GoogleFont("Source Sans Pro"), "ui-sans-serif", "system-ui", "sans-serif"], |
|
) |
|
css = """ |
|
#component-1{text-align:center} |
|
.main_ui_logged_out{opacity: 0.3; pointer-events: none} |
|
.tabitem{border: 0px} |
|
""" |
|
|
|
def swap_visibilty(profile: Union[gr.OAuthProfile, None]): |
|
print(profile) |
|
if is_spaces: |
|
if profile is None: |
|
return gr.update(elem_classes=["main_ui_logged_out"]) |
|
else: |
|
print(profile.name) |
|
return gr.update(elem_classes=["main_ui_logged_in"]) |
|
else: |
|
return gr.update(elem_classes=["main_ui_logged_in"]) |
|
|
|
|
|
with gr.Blocks(theme=theme, css=css) as demo: |
|
gr.Markdown( |
|
"""# LoRA Ease for FLUX 🧞♂️ |
|
### Train a high quality FLUX LoRA in a breeze ༄ using [Ostris' AI Toolkit](https://github.com/ostris/ai-toolkit) and [AutoTrain Advanced](https://github.com/huggingface/autotrain-advanced)""" |
|
) |
|
if is_spaces: |
|
gr.LoginButton("Sign in with Hugging Face to train your LoRA on Spaces", visible=is_spaces) |
|
with gr.Tab("Train on Spaces" if is_spaces else "Train locally"): |
|
with gr.Column() as main_ui: |
|
with gr.Row(): |
|
lora_name = gr.Textbox( |
|
label="The name of your LoRA", |
|
info="This has to be a unique name", |
|
placeholder="e.g.: Persian Miniature Painting style, Cat Toy", |
|
) |
|
|
|
|
|
|
|
concept_sentence = gr.Textbox( |
|
label="Trigger word/sentence", |
|
info="Trigger word or sentence to be used", |
|
placeholder="uncommon word like p3rs0n or trtcrd, or sentence like 'in the style of CNSTLL'", |
|
interactive=True, |
|
) |
|
with gr.Group(visible=True) as image_upload: |
|
with gr.Row(): |
|
images = gr.File( |
|
file_types=["image"], |
|
label="Upload your images", |
|
file_count="multiple", |
|
interactive=True, |
|
visible=True, |
|
scale=1, |
|
) |
|
with gr.Column(scale=3, visible=False) as captioning_area: |
|
with gr.Column(): |
|
gr.Markdown( |
|
"""# Custom captioning |
|
You can optionally add a custom caption for each image (or use an AI model for this). [trigger] will represent your concept sentence/trigger word. |
|
""" |
|
) |
|
do_captioning = gr.Button("Add AI captions with Florence-2") |
|
output_components = [captioning_area] |
|
caption_list = [] |
|
for i in range(1, MAX_IMAGES + 1): |
|
locals()[f"captioning_row_{i}"] = gr.Row(visible=False) |
|
with locals()[f"captioning_row_{i}"]: |
|
locals()[f"image_{i}"] = gr.Image( |
|
type="filepath", |
|
width=111, |
|
height=111, |
|
min_width=111, |
|
interactive=False, |
|
scale=2, |
|
show_label=False, |
|
show_share_button=False, |
|
show_download_button=False, |
|
) |
|
locals()[f"caption_{i}"] = gr.Textbox( |
|
label=f"Caption {i}", scale=15, interactive=True |
|
) |
|
|
|
output_components.append(locals()[f"captioning_row_{i}"]) |
|
output_components.append(locals()[f"image_{i}"]) |
|
output_components.append(locals()[f"caption_{i}"]) |
|
caption_list.append(locals()[f"caption_{i}"]) |
|
|
|
with gr.Accordion("Advanced options", open=False): |
|
steps = gr.Number(label="Steps", value=1000, minimum=1, maximum=10000, step=1) |
|
lr = gr.Number(label="Learning Rate", value=4e-4, minimum=1e-6, maximum=1e-3, step=1e-6) |
|
rank = gr.Number(label="LoRA Rank", value=16, minimum=4, maximum=128, step=4) |
|
|
|
with gr.Accordion("Sample prompts", visible=False) as sample: |
|
gr.Markdown( |
|
"Include sample prompts to test out your trained model. Don't forget to include your trigger word/sentence (optional)" |
|
) |
|
sample_1 = gr.Textbox(label="Test prompt 1") |
|
sample_2 = gr.Textbox(label="Test prompt 2") |
|
sample_3 = gr.Textbox(label="Test prompt 3") |
|
|
|
output_components.append(sample) |
|
output_components.append(sample_1) |
|
output_components.append(sample_2) |
|
output_components.append(sample_3) |
|
start = gr.Button("Start training") |
|
progress_area = gr.Markdown("") |
|
|
|
with gr.Tab("Train locally" if is_spaces else "Instructions"): |
|
gr.Markdown( |
|
f"""To use FLUX LoRA Ease locally with this UI, you can clone this repository (yes, HF Spaces are git repos!) |
|
```bash |
|
git clone https://huggingface.co/spaces/flux-train/flux-lora-trainer |
|
cd flux-lora-trainer |
|
pip install requirements_local.txt |
|
``` |
|
|
|
Then you can install ai-toolkit |
|
```bash |
|
git clone https://github.com/ostris/ai-toolkit.git |
|
cd ai-toolkit |
|
git submodule update --init --recursive |
|
python3 -m venv venv |
|
source venv/bin/activate |
|
# .\venv\Scripts\activate on windows |
|
# install torch first |
|
pip3 install torch |
|
pip3 install -r requirements.txt |
|
cd .. |
|
``` |
|
|
|
Login with Hugging Face to access FLUX.1 [dev], choose a token with `write` permissions to push your LoRAs to the HF Hub |
|
```bash |
|
huggingface-cli login |
|
``` |
|
|
|
Now you can run FLUX LoRA Ease locally by doing a simple |
|
```py |
|
python app.py |
|
``` |
|
If you prefer command line, you can run Ostris' [AI Toolkit](https://github.com/ostris/ai-toolkit) yourself directly. |
|
""" |
|
) |
|
|
|
dataset_folder = gr.State() |
|
|
|
images.upload(load_captioning, inputs=[images, concept_sentence], outputs=output_components) |
|
|
|
start.click(fn=create_dataset, inputs=[images] + caption_list, outputs=dataset_folder).then( |
|
fn=start_training, |
|
inputs=[ |
|
lora_name, |
|
concept_sentence, |
|
steps, |
|
lr, |
|
rank, |
|
dataset_folder, |
|
sample_1, |
|
sample_2, |
|
sample_3, |
|
], |
|
outputs=progress_area, |
|
) |
|
|
|
do_captioning.click(fn=run_captioning, inputs=[images, concept_sentence] + caption_list, outputs=caption_list) |
|
demo.load(fn=swap_visibilty, outputs=main_ui) |
|
|
|
if __name__ == "__main__": |
|
demo.queue() |
|
demo.launch(share=True) |
|
|