Spaces:
Running
Running
import argparse | |
import re | |
from threading import Thread | |
import gradio as gr | |
import torch | |
from PIL import ImageDraw | |
from torchvision.transforms.v2 import Resize | |
from transformers import AutoTokenizer, TextIteratorStreamer | |
from moondream.hf import LATEST_REVISION, Moondream, detect_device | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--cpu", action="store_true") | |
args = parser.parse_args() | |
if args.cpu: | |
device = torch.device("cpu") | |
dtype = torch.float32 | |
else: | |
device, dtype = detect_device() | |
if device != torch.device("cpu"): | |
print("Using device:", device) | |
print("If you run into issues, pass the `--cpu` flag to this script.") | |
print() | |
model_id = "vikhyatk/moondream2" | |
tokenizer = AutoTokenizer.from_pretrained(model_id, revision=LATEST_REVISION) | |
moondream = Moondream.from_pretrained( | |
model_id, revision=LATEST_REVISION, torch_dtype=dtype | |
).to(device=device) | |
moondream.eval() | |
def answer_question(img, prompt): | |
image_embeds = moondream.encode_image(img) | |
streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True) | |
thread = Thread( | |
target=moondream.answer_question, | |
kwargs={ | |
"image_embeds": image_embeds, | |
"question": prompt, | |
"tokenizer": tokenizer, | |
"streamer": streamer, | |
}, | |
) | |
thread.start() | |
buffer = "" | |
for new_text in streamer: | |
buffer += new_text | |
yield buffer | |
def extract_floats(text): | |
# Regular expression to match an array of four floating point numbers | |
pattern = r"\[\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*\]" | |
match = re.search(pattern, text) | |
if match: | |
# Extract the numbers and convert them to floats | |
return [float(num) for num in match.groups()] | |
return None # Return None if no match is found | |
def extract_bbox(text): | |
bbox = None | |
if extract_floats(text) is not None: | |
x1, y1, x2, y2 = extract_floats(text) | |
bbox = (x1, y1, x2, y2) | |
return bbox | |
def process_answer(img, answer): | |
if extract_bbox(answer) is not None: | |
x1, y1, x2, y2 = extract_bbox(answer) | |
draw_image = Resize(768)(img) | |
width, height = draw_image.size | |
x1, x2 = int(x1 * width), int(x2 * width) | |
y1, y2 = int(y1 * height), int(y2 * height) | |
bbox = (x1, y1, x2, y2) | |
ImageDraw.Draw(draw_image).rectangle(bbox, outline="red", width=3) | |
return gr.update(visible=True, value=draw_image) | |
return gr.update(visible=False, value=None) | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# π moondream | |
""" | |
) | |
with gr.Row(): | |
prompt = gr.Textbox(label="Input Prompt", value="Describe this image.", scale=4) | |
submit = gr.Button("Submit") | |
with gr.Row(): | |
img = gr.Image(type="pil", label="Upload an Image") | |
with gr.Column(): | |
output = gr.Markdown(label="Response") | |
ann = gr.Image(visible=False, label="Annotated Image") | |
submit.click(answer_question, [img, prompt], output) | |
prompt.submit(answer_question, [img, prompt], output) | |
output.change(process_answer, [img, output], ann, show_progress=False) | |
demo.queue().launch(debug=True) | |