Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import cv2 | |
from encoded_video import EncodedVideo, write_video | |
import torch | |
import numpy as np | |
from torchvision.datasets import ImageFolder | |
import transformers | |
from transformers import ViTFeatureExtractor, ViTForImageClassification, AutoFeatureExtractor, ViTMSNForImageClassification | |
from pathlib import Path | |
import pytorch_lightning as pl | |
from torch.utils.data import DataLoader | |
from torchmetrics import Accuracy | |
from torchvision import transforms | |
from PIL import Image | |
import PIL | |
os.environ['SHM_SIZE'] = '2G' | |
HF_DATASETS_CACHE="./" | |
class ImageClassificationCollator: | |
def __init__(self, feature_extractor): | |
self.feature_extractor = feature_extractor | |
def __call__(self, batch): | |
encodings = self.feature_extractor([x[0] for x in batch], return_tensors='pt') | |
encodings['labels'] = torch.tensor([x[1] for x in batch], dtype=torch.long) | |
return encodings | |
class Classifier(pl.LightningModule): | |
def __init__(self, model, lr: float = 2e-5, **kwargs): | |
super().__init__() | |
self.save_hyperparameters('lr', *list(kwargs)) | |
self.model = model | |
self.forward = self.model.forward | |
self.val_acc = Accuracy( | |
task='multiclass' if model.config.num_labels > 2 else 'binary', | |
num_classes=model.config.num_labels | |
) | |
def training_step(self, batch, batch_idx): | |
outputs = self(**batch) | |
self.log(f"train_loss", outputs.loss) | |
return outputs.loss | |
def validation_step(self, batch, batch_idx): | |
outputs = self(**batch) | |
self.log(f"val_loss", outputs.loss) | |
acc = self.val_acc(outputs.logits.argmax(1), batch['labels']) | |
self.log(f"val_acc", acc, prog_bar=True) | |
return outputs.loss | |
def configure_optimizers(self): | |
return torch.optim.Adam(self.parameters(), lr=self.hparams.lr) | |
def video_identity(video,user_name,class_name,trainortest,ready): | |
if ready=='yes': | |
data_dir = Path(str(user_name)+'/train') | |
train_ds = ImageFolder(data_dir) | |
test_dir = Path(str(user_name)+'/test') | |
test_ds = ImageFolder(test_dir) | |
label2id = {} | |
id2label = {} | |
for i, class_name in enumerate(train_ds.classes): | |
label2id[class_name] = str(i) | |
id2label[str(i)] = class_name | |
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k') | |
model = ViTForImageClassification.from_pretrained( | |
'google/vit-base-patch16-224-in21k', | |
num_labels=len(label2id), | |
label2id=label2id, | |
id2label=id2label | |
) | |
collator = ImageClassificationCollator(feature_extractor) | |
train_loader = DataLoader(train_ds, batch_size=8, collate_fn=collator, num_workers=8, shuffle=True) | |
test_loader = DataLoader(test_ds, batch_size=8, collate_fn=collator, num_workers=8) | |
val_batch = next(iter(test_loader)) | |
outputs = model(**val_batch) | |
preds=outputs.logits.softmax(1).argmax(1) | |
for name, param in model.named_parameters(): | |
param.requires_grad = False | |
if name.startswith("classifier"): # choose whatever you like here | |
param.requires_grad = True | |
pl.seed_everything(42) | |
classifier = Classifier(model, lr=2e-5) | |
trainer = pl.Trainer(accelerator='gpu', devices=1, precision=16, max_epochs=30) | |
trainer.fit(classifier, train_loader, test_loader) | |
threshold = 0.7 # set the score threshold | |
for batch_idx, data in enumerate(test_loader): | |
outputs = model(**data) | |
scores = outputs.logits.softmax(1) | |
print(scores) | |
preds = [] | |
for score in scores: | |
if score.max() > threshold: | |
preds.append(str(score.argmax().item())) | |
else: | |
preds.append('None') | |
print(preds) | |
labels = str(data['labels']) | |
return outputs, preds, preds | |
else: | |
capture = cv2.VideoCapture(video) | |
user_d=str(user_name)+'/'+str(trainortest) | |
class_d=str(user_name)+'/'+str(trainortest)+'/'+str(class_name) | |
if not os.path.exists(user_d): | |
os.makedirs(user_d) | |
if not os.path.exists(class_d): | |
os.makedirs(class_d) | |
frameNr = 0 | |
while (True): | |
success, frame = capture.read() | |
if success: | |
cv2.imwrite(f'{class_d}/frame_{frameNr}.jpg', frame) | |
else: | |
break | |
frameNr = frameNr+10 | |
a=pl.__version__ | |
img=cv2.imread(class_d+'/frame_0.jpg') | |
return img, a, class_d | |
demo = gr.Interface(video_identity, | |
inputs=[gr.Video(source='upload'), | |
gr.Text(), | |
gr.Text(), | |
gr.Text(label='Which set is this? (type train or test)'), | |
gr.Text(label='Are you ready? (type yes or no)')], | |
outputs=[gr.Text(), | |
gr.Text(), | |
gr.Text()], | |
cache_examples=True) | |
demo.launch(debug=True) |