Spaces:
Runtime error
Runtime error
import base64 | |
import concurrent.futures | |
import time | |
import numpy as np | |
import requests | |
import psutil | |
from urllib.request import urlopen | |
import matplotlib.pyplot as plt | |
from loguru import logger | |
import os | |
from dotenv import load_dotenv, find_dotenv | |
load_dotenv(find_dotenv()) | |
# Try importing `gpustat` for GPU monitoring | |
try: | |
import gpustat | |
GPU_AVAILABLE = True | |
except ImportError: | |
GPU_AVAILABLE = False | |
# Constants | |
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8080") | |
TEST_IMAGE_URL = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/beignets-task-guide.png" | |
def fetch_and_prepare_payload(): | |
""" | |
Fetch the test image and prepare a base64 payload. | |
""" | |
try: | |
img_data = urlopen(TEST_IMAGE_URL).read() | |
return base64.b64encode(img_data).decode("utf-8") | |
except Exception as e: | |
logger.info(f"Error fetching the image: {e}") | |
return None | |
def send_request(payload, batch=False): | |
""" | |
Send a single or batch request and measure response time. | |
""" | |
start_time = time.time() | |
endpoint = f"{SERVER_URL}/predict" | |
try: | |
if batch: | |
response = requests.post(endpoint, json=[{"image": img} for img in payload]) | |
else: | |
response = requests.post(endpoint, json={"image": payload}) | |
response_time = time.time() - start_time | |
predictions = response.json() if response.status_code == 200 else None | |
return response_time, response.status_code, predictions | |
except Exception as e: | |
logger.info(f"Error sending request: {e}") | |
return None, None, None | |
def get_system_metrics(): | |
""" | |
Get current CPU and GPU usage. | |
""" | |
metrics = {"cpu_usage": psutil.cpu_percent(0.1)} | |
if GPU_AVAILABLE: | |
try: | |
gpu_stats = gpustat.GPUStatCollection.new_query() | |
metrics["gpu_usage"] = sum([gpu.utilization for gpu in gpu_stats.gpus]) | |
except Exception: | |
metrics["gpu_usage"] = -1 | |
else: | |
metrics["gpu_usage"] = -1 | |
return metrics | |
def benchmark_api(num_requests=100, concurrency_level=10, batch=False): | |
""" | |
Benchmark the API server. | |
""" | |
payload = fetch_and_prepare_payload() | |
if not payload: | |
logger.info("Error preparing payload. Benchmark aborted.") | |
return | |
payloads = [payload] * num_requests if batch else [payload] | |
system_metrics = [] | |
response_times = [] | |
status_codes = [] | |
predictions = [] | |
# Start benchmark timer | |
start_benchmark_time = time.time() | |
with concurrent.futures.ThreadPoolExecutor( | |
max_workers=concurrency_level | |
) as executor: | |
futures = [ | |
executor.submit(send_request, payloads if batch else payload, batch) | |
for _ in range(num_requests) | |
] | |
while any(not f.done() for f in futures): | |
system_metrics.append(get_system_metrics()) | |
time.sleep(0.1) | |
for future in futures: | |
result = future.result() | |
if result: | |
response_time, status_code, prediction = result | |
response_times.append(response_time) | |
status_codes.append(status_code) | |
predictions.append(prediction) | |
# Stop benchmark timer | |
total_benchmark_time = time.time() - start_benchmark_time | |
avg_cpu = np.mean([m["cpu_usage"] for m in system_metrics]) | |
avg_gpu = np.mean([m["gpu_usage"] for m in system_metrics]) if GPU_AVAILABLE else -1 | |
success_rate = (status_codes.count(200) / num_requests) * 100 if status_codes else 0 | |
avg_response_time = np.mean(response_times) * 1000 if response_times else 0 # ms | |
requests_per_second = num_requests / total_benchmark_time | |
logger.info("\n--- Sample Predictions ---") | |
for i, prediction in enumerate( | |
predictions[:5] | |
): # Show predictions for the first 5 requests | |
logger.info(f"Request {i + 1}: {prediction}") | |
return { | |
"total_requests": num_requests, | |
"concurrency_level": concurrency_level, | |
"total_time": total_benchmark_time, | |
"avg_response_time": avg_response_time, | |
"success_rate": success_rate, | |
"requests_per_second": requests_per_second, | |
"avg_cpu_usage": avg_cpu, | |
"avg_gpu_usage": avg_gpu, | |
} | |
def run_benchmarks(): | |
""" | |
Run comprehensive benchmarks and create separate plots for CPU and GPU usage. | |
""" | |
concurrency_levels = [1, 8, 16, 32] | |
metrics = [] | |
logger.info("Running API benchmarks...") | |
for concurrency in concurrency_levels: | |
logger.info(f"\nTesting concurrency level: {concurrency}") | |
result = benchmark_api( | |
num_requests=50, concurrency_level=concurrency, batch=False | |
) | |
if result: | |
metrics.append(result) | |
logger.info( | |
f"Concurrency {concurrency}: " | |
f"{result['requests_per_second']:.2f} reqs/sec, " | |
f"CPU: {result['avg_cpu_usage']:.1f}%, " | |
f"GPU: {result['avg_gpu_usage']:.1f}%" | |
) | |
# Generate CPU Usage Plot | |
plt.figure(figsize=(10, 5)) | |
plt.plot( | |
concurrency_levels, | |
[m["avg_cpu_usage"] for m in metrics], | |
"b-o", | |
label="CPU Usage", | |
) | |
plt.xlabel("Concurrency Level") | |
plt.ylabel("CPU Usage (%)") | |
plt.title("CPU Usage vs. Concurrency Level") | |
plt.grid(True) | |
plt.savefig("artifacts/cpu_usage.png") | |
logger.info("CPU usage plot saved as 'cpu_usage.png'.") | |
# Generate GPU Usage Plot | |
if GPU_AVAILABLE: | |
plt.figure(figsize=(10, 5)) | |
plt.plot( | |
concurrency_levels, | |
[m["avg_gpu_usage"] for m in metrics], | |
"g-o", | |
label="GPU Usage", | |
) | |
plt.xlabel("Concurrency Level") | |
plt.ylabel("GPU Usage (%)") | |
plt.title("GPU Usage vs. Concurrency Level") | |
plt.grid(True) | |
plt.savefig("artifacts/gpu_usage.png") | |
logger.info("GPU usage plot saved as 'gpu_usage.png'.") | |
if __name__ == "__main__": | |
run_benchmarks() | |