|
import logging |
|
import gradio as gr |
|
import os |
|
from roboflow import Roboflow |
|
from dotenv import load_dotenv |
|
from openai import OpenAI |
|
import tempfile |
|
import numpy as np |
|
from PIL import Image, ImageDraw |
|
import base64 |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
roboflow_key = os.getenv("ROBOFLOW_API_KEY") |
|
if not roboflow_key: |
|
raise ValueError("ROBOFLOW_API_KEY is missing. Please add it to the .env file.") |
|
|
|
openai_key = os.getenv("OPENAI_API_KEY") |
|
if not openai_key: |
|
raise ValueError("OPENAI_API_KEY is missing. Please add it to the .env file.") |
|
|
|
|
|
rf = Roboflow(api_key=roboflow_key) |
|
project = rf.workspace("alat-pelindung-diri").project("nescafe-4base") |
|
model = project.version(16).model |
|
|
|
client_openai = OpenAI(api_key=openai_key) |
|
|
|
|
|
def detect_and_estimate_objects(image): |
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
|
image.save(temp_file, format="JPEG") |
|
temp_file_path = temp_file.name |
|
|
|
logger.info("Image saved successfully for processing.") |
|
|
|
|
|
predictions = model.predict(temp_file_path, confidence=70, overlap=80).json() |
|
class_count = {} |
|
object_positions = [] |
|
|
|
|
|
draw = ImageDraw.Draw(image) |
|
for prediction in predictions['predictions']: |
|
class_name = prediction['class'] |
|
x, y, width, height = prediction['x'], prediction['y'], prediction['width'], prediction['height'] |
|
|
|
|
|
left = int(x - width / 2) |
|
top = int(y - height / 2) |
|
right = int(x + width / 2) |
|
bottom = int(y + height / 2) |
|
|
|
|
|
draw.rectangle([left, top, right, bottom], outline="red", width=4) |
|
|
|
|
|
class_count[class_name] = class_count.get(class_name, 0) + 1 |
|
object_positions.append((left, top, right, bottom)) |
|
|
|
logger.info(f"YOLO detected objects: {class_count}") |
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
|
image.save(temp_file, format="JPEG") |
|
temp_file_path = temp_file.name |
|
|
|
with open(temp_file_path, "rb") as image_file: |
|
base64_image = base64.b64encode(image_file.read()).decode("utf-8") |
|
logger.info(f"Base64 encoding successful. Length: {len(base64_image)}") |
|
|
|
|
|
response = client_openai.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": """Please count the number of cans of the following Nestlé products in the image, including those that are partially obstructed or hidden. |
|
For partially visible or obstructed cans, please estimate their number based on visible clues and assume that they belong to the same product in front of them. |
|
Please count accurately the number of cans of the following Nestlé products in the image: |
|
- Nescafe Mocha |
|
- Nescafe Latte |
|
- Nescafe Original |
|
- Bear Brand |
|
- Nescafe Cappuccino |
|
- Nescafe Ice Black |
|
- Nescafe Coconut Latte |
|
- Nescafe Caramel |
|
Please note that some products may be partially visible or obstructed, but are still important to count. Products that are only partially visible or obstructed. Think of them as cans of the same product in front of them. |
|
Please count the visible cans as well as the occluded ones. For partially hidden cans, assume they are the same product and estimate their presence based on the visible portion. |
|
|
|
Provide your response in the format: |
|
Nescafé Mocha: [number] |
|
Nescafé Latte: [number] |
|
Nescafé Original: [number] |
|
Bear Brand: [number] |
|
Nescafé Cappuccino: [number] |
|
Nescafé Ice Black: [number] |
|
Nescafé Coconut Latte: [number] |
|
Nescafé Caramel: [number] |
|
Total Nestlé Products: [Total number of Nestlé products]""", |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}, |
|
}, |
|
], |
|
} |
|
], |
|
) |
|
gpt_estimation = response.choices[0].message.content.strip() |
|
print(response.choices[0].message.content) |
|
|
|
logger.info(f"GPT-4 estimation: {gpt_estimation}") |
|
|
|
|
|
result_text = f"Results from GPT-4:\n{gpt_estimation}" |
|
|
|
|
|
output_path = "/tmp/prediction_result.jpg" |
|
image.save(output_path) |
|
|
|
logger.info("Processed image saved successfully.") |
|
|
|
|
|
os.remove(temp_file_path) |
|
|
|
return output_path, result_text |
|
|
|
except Exception as e: |
|
logger.error(f"Error during processing: {e}") |
|
return None, f"Error: {e}" |
|
|
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("### Object Detection and Counting with YOLO and GPT-4 Assistance") |
|
with gr.Row(): |
|
input_image = gr.Image(type="pil", label="Upload Image") |
|
output_image = gr.Image(label="Processed Image") |
|
output_text = gr.Textbox(label="Results", interactive=False) |
|
|
|
detect_button = gr.Button("Process Image") |
|
detect_button.click( |
|
fn=detect_and_estimate_objects, |
|
inputs=[input_image], |
|
outputs=[output_image, output_text] |
|
) |
|
|
|
iface.launch(debug=True) |