Spaces:
Sleeping
Sleeping
import os | |
import logging | |
import torch | |
import datetime | |
import requests | |
from google.cloud import storage | |
from transformers import AutoImageProcessor, AutoModelForObjectDetection | |
from label_studio_ml.model import LabelStudioMLBase | |
from lxml import etree | |
from uuid import uuid4 | |
from PIL import Image | |
from creds import get_credentials | |
from io import BytesIO | |
def generate_download_signed_url_v4(blob_name): | |
"""Generates a v4 signed URL for downloading a blob. | |
Note that this method requires a service account key file. You can not use | |
this if you are using Application Default Credentials from Google Compute | |
Engine or from the Google Cloud SDK. | |
""" | |
bucket_name = os.getenv("bucket") | |
storage_client = storage.Client() | |
bucket = storage_client.bucket(bucket_name) | |
blob = bucket.blob(blob_name.replace(f"gs://{bucket_name}/", "")) | |
url = blob.generate_signed_url( | |
version="v4", | |
# This URL is valid for 15 minutes | |
expiration=datetime.timedelta(minutes=15), | |
# Allow GET requests using this URL. | |
method="GET", | |
) | |
return url | |
class Model(LabelStudioMLBase): | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = get_credentials() | |
image_processor = AutoImageProcessor.from_pretrained("diegokauer/conditional-detr-coe-int") | |
model = AutoModelForObjectDetection.from_pretrained("diegokauer/conditional-detr-coe-int") | |
def predict(self, tasks, **kwargs): | |
""" This is where inference happens: model returns | |
the list of predictions based on input list of tasks | |
""" | |
predictions = [] | |
for task in tasks: | |
url = task["data"]["image"] | |
response = requests.get(generate_download_signed_url_v4(url)) | |
print(response) | |
image_data = BytesIO(response.content) | |
image = Image.open(image_data) | |
original_width, original_height = image.size | |
with torch.no_grad(): | |
inputs = self.image_processor(images=image, return_tensors="pt") | |
outputs = self.model(**inputs) | |
target_sizes = torch.tensor([image.size[::-1]]) | |
results = self.image_processor.post_process_object_detection(outputs, threshold=0.5, target_sizes=target_sizes)[0] | |
result_list = [] | |
for score, label, box in zip(results['scores'], results['labels'], results['boxes']): | |
label_id = str(uuid4())[:4] | |
x, y, x2, y2 = tuple(box) | |
result_list.append({ | |
'id': label_id, | |
'original_width': original_width, | |
'original_height': original_height, | |
'from_name': "label", | |
'to_name': "image", | |
'type': 'labels', | |
'score': score, # per-region score, visible in the editor | |
'value': { | |
'x': x, | |
'y': y, | |
'width': x2-x, | |
'height': y2-y, | |
'rotation': 0, | |
'labels': [self.id2label[label]] | |
} | |
}) | |
predictions.append({ | |
'score': results['scores'].mean(), # prediction overall score, visible in the data manager columns | |
'model_version': 'diegokauer/conditional-detr-coe-int', # all predictions will be differentiated by model version | |
'result': result_list | |
}) | |
return predictions | |
def fit(self, event, annotations, **kwargs): | |
""" This is where training happens: train your model given list of annotations, | |
then returns dict with created links and resources | |
""" | |
return {'path/to/created/model': 'my/model.bin'} |