import streamlit as st # x = st.slider("Select a value") # st.write(x, "squared is", x * x) st.title('Welcome to the Labelbox custom classifier training application!') st.header('In this module you will be able to add data to your instance if you dont already have it, and then use bulk classification to train a custom image classification model') st.subheader("If you don't have data in your org, enter your API Click the button below! Otherwise, Skip to section 2") # -*- coding: utf-8 -*- """ Original file is located at https://colab.research.google.com/drive/1nOSff67KXhNgX_XSfnv3xnddobRoaK0d """ api_key = st.text_input("Enter your api key:", type="password") import labelbox import labelpandas as lp import os import pandas as pd from tensorflow.python.lib.io import file_io import io from pandas import read_csv # read csv file from google cloud storage def read_data(gcs_path): file_stream = file_io.FileIO(gcs_path, mode='r') csv_data = read_csv(io.StringIO(file_stream.read())) return csv_data def freedatatolb(amount_of_data): client = lp.Client(api_key) gcs_path = 'https://storage.googleapis.com/solution_accelerator_datasets/images_styles.csv' df = pd.read_csv(gcs_path) df = df.drop(['id', 'season', 'usage', 'year',"gender", "masterCategory", "subCategory", "articleType","baseColour"], axis =1) fields ={"row_data":["link"], # Column containing URL to asset (single) "global_key": ['filename'], # Column containing globalkey value (single, unique) "external_id": ["productDisplayName"], # Column containing external ID value (single) "metadata_string": [], # Column containing string metadata values (multiple) "metadata_number": [], # Column containing number metadata values (multiple) "metadata_datetime": [] # Column containing datetime metadata values (multiple, must be ISO 8601) } columns = {} for field in fields.keys(): for name in fields[field]: if field.startswith('metadata'): columns[name] = f"{field.split('_')[0]}///{field.split('_')[1]}///{name}" else: columns[name] = field new_df = df.rename(columns=(columns)) testdf = new_df.head(amount_of_data) dataset_id = client.lb_client.create_dataset(name = str(gcs_path.split('/')[-1])).uid # dataset_id = client.lb_client.get_dataset("c4b7prd6207850000lljx2hr8").uid results = client.create_data_rows_from_table( table = testdf, dataset_id = dataset_id, skip_duplicates = True, # If True, will skip data rows where a global key is already in use, verbose = True, # If True, prints information about code execution ) return results data_amount = st.slider("choose amout of data to add to labelbox", 250, 1000) if st.button("Add data to your Labelbox"): st.write(f"adding {data_amount} datarows to Labelbox instance") bing = freedatatolb(data_amount) st.write(bing) st.title("SECTION 2") st.title("Auto Image classifier training and inference: Imagnet Weights") # -*- coding: utf-8 -*- """ Original file is located at https://colab.research.google.com/drive/1CSyAE9DhwGTl7bLaSoo7QSyMuoEqJpCj """ def train_and_inference(api_key, ontology_id, model_run_id): # st.write('thisisstarting') api_key = api_key # insert Labelbox API key ontology_id = ontology_id # get the ontology ID from the Settings tab at the top left of your model run model_run_id = model_run_id #get the model run ID from the settings gear icon on the right side of your Model Run # st.write('1') import pydantic st.write(pydantic.__version__) import numpy as np # st.write('2') import tensorflow as tf # st.write('3') from tensorflow.keras import layers # st.write('4') from tensorflow.keras.models import Sequential # st.write('5') from tensorflow.keras.preprocessing.image import ImageDataGenerator # st.write('6') import os # st.write('7') import labelbox # st.write('zat') from labelbox import Client # st.write('8') # st.write('9') import numpy as np import tensorflow as tf from tensorflow.keras import layers from tensorflow.keras.models import Sequential from tensorflow.keras.preprocessing.image import ImageDataGenerator import os from labelbox.schema.ontology import OntologyBuilder, Tool, Classification, Option from labelbox import Client, LabelingFrontend, LabelImport, MALPredictionImport from labelbox.data.annotation_types import ( Label, ImageData, ObjectAnnotation, MaskData, Rectangle, Point, Line, Mask, Polygon, Radio, Checklist, Text, ClassificationAnnotation, ClassificationAnswer ) from labelbox import MediaType from labelbox.data.serialization import NDJsonConverter import pandas as pd import shutil import labelbox.data import scipy import json import uuid import time import requests import pandas as pd import shutil import json import uuid import time import requests # st.write('imports') """Connect to labelbox client Define Model Variables """ client = Client(api_key) EPOCHS = 10 """#Setup Training Export Classifications from Model Run """ model_run = client.get_model_run(model_run_id) client.enable_experimental = True data_json = model_run.export_labels(download=True) print(data_json) """Separate datarows into folders.""" import requests import os from urllib.parse import unquote def download_and_save_image(url, destination_folder, filename): try: # Decode the URL url = unquote(url) # Ensure destination directory exists if not os.path.exists(destination_folder): os.makedirs(destination_folder) # Start the download process response = requests.get(url, stream=True) # Check if the request was successful if response.status_code == 200: file_path = os.path.join(destination_folder, filename) with open(file_path, 'wb') as file: for chunk in response.iter_content(8192): file.write(chunk) # st.write(f"Image downloaded and saved: {file_path}") # else: # st.write(f"Failed to download the image. Status code: {response.status_code}") except Exception as e: st.write(f"An error occurred: {e}") BASE_DIR = 'dataset' labeldict = {} for entry in data_json: data_split = entry['Data Split'] if data_split not in ['training', 'validation']: # we are skipping 'test' for now continue image_url = f"{entry['Labeled Data']}" label = entry['Label']['classifications'][0]['answer']['value'] labeldict[label] = entry['Label']['classifications'][0]['answer']['title'] destination_folder = os.path.join(BASE_DIR, data_split, label) filename = os.path.basename(image_url) # st.write(filename) download_and_save_image(image_url, destination_folder, filename) """#Train Model""" st.write(labeldict) import tensorflow as tf from tensorflow.keras.preprocessing.image import ImageDataGenerator from tensorflow.keras.applications import MobileNetV2 from tensorflow.keras.layers import Dense, GlobalAveragePooling2D from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam TRAIN_DIR = 'dataset/training' VALIDATION_DIR = 'dataset/validation' IMG_HEIGHT, IMG_WIDTH = 224, 224 # default size for MobileNetV2 BATCH_SIZE = 32 train_datagen = ImageDataGenerator( rescale=1./255, rotation_range=20, width_shift_range=0.2, height_shift_range=0.2, shear_range=0.2, zoom_range=0.2, horizontal_flip=True, fill_mode='nearest' ) validation_datagen = ImageDataGenerator(rescale=1./255) train_ds = train_datagen.flow_from_directory( TRAIN_DIR, target_size=(IMG_HEIGHT, IMG_WIDTH), batch_size=BATCH_SIZE, class_mode='categorical' ) validation_ds = validation_datagen.flow_from_directory( VALIDATION_DIR, target_size=(IMG_HEIGHT, IMG_WIDTH), batch_size=BATCH_SIZE, class_mode='categorical' ) base_model = MobileNetV2(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), include_top=False, weights='imagenet') # Freeze the base model for layer in base_model.layers: layer.trainable = False # Create custom classification head x = base_model.output x = GlobalAveragePooling2D()(x) x = Dense(1024, activation='relu')(x) predictions = Dense(train_ds.num_classes, activation='softmax')(x) model = Model(inputs=base_model.input, outputs=predictions) model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy']) st.write("training") history = model.fit( train_ds, validation_data=validation_ds, epochs=EPOCHS ) """Run Inference on Model run Datarows""" st.write('running Inference') import numpy as np import requests from tensorflow.keras.preprocessing import image from PIL import Image from io import BytesIO # Fetch the image from the URL def load_image_from_url(img_url, target_size=(224, 224)): response = requests.get(img_url) img = Image.open(BytesIO(response.content)) img = img.resize(target_size) img_array = image.img_to_array(img) return np.expand_dims(img_array, axis=0) def make_prediction(img_url): # Image URL img_url = img_url # Load and preprocess the image img_data = load_image_from_url(img_url) img_data = img_data / 255.0 # Normalize the image data to [0,1] # Make predictions predictions = model.predict(img_data) predicted_class = np.argmax(predictions[0]) # Retrieve the confidence score (probability) for the predicted class confidence = predictions[0][predicted_class] # Map the predicted class index to its corresponding label class_map = train_ds.class_indices inverse_map = {v: k for k, v in class_map.items()} predicted_label = inverse_map[predicted_class] return predicted_label, confidence from tensorflow.errors import InvalidArgumentError # Add this import ontology = client.get_ontology(ontology_id) label_list = [] st.write(ontology) for datarow in model_run.export_labels(download=True): try: label, confidence = make_prediction(datarow['Labeled Data']) except InvalidArgumentError as e: print(f"InvalidArgumentError: {e}. Skipping this data row.") continue # Skip to the next datarow if an exception occurs my_checklist_answer = ClassificationAnswer( name = labeldict[label.lower()], confidence=confidence) checklist_prediction = ClassificationAnnotation( name=ontology.classifications()[0].instructions, value=Radio( answer = my_checklist_answer )) # print(datarow["DataRow ID"]) label_prediction = Label( data=ImageData(uid=datarow['DataRow ID']), annotations = [checklist_prediction]) label_list.append(label_prediction) prediction_import = model_run.add_predictions( name="prediction_upload_job"+str(uuid.uuid4()), predictions=label_list) prediction_import.wait_until_done() st.write(prediction_import.errors == []) if prediction_import.errors == []: return "Model Trained and inference ran successfully" else: return prediction_import.errors st.title("Enter Applicable IDs and keys below") model_run_id = st.text_input("Enter your model run ID:") ontology_id = st.text_input("Enter your ontology ID:") if st.button("Train and run inference"): st.write('Starting Up...') # Check if the key is not empty if api_key + model_run_id + ontology_id: result = train_and_inference(api_key, ontology_id, model_run_id) st.write(result) else: st.warning("Please enter all keys.")