Spaces:
Runtime error
Runtime error
File size: 5,938 Bytes
16d3463 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import base64
import concurrent.futures
import time
import numpy as np
import requests
import psutil
from urllib.request import urlopen
import matplotlib.pyplot as plt
# Try importing `gpustat` for GPU monitoring
try:
import gpustat
GPU_AVAILABLE = True
except ImportError:
GPU_AVAILABLE = False
# Constants
SERVER_URL = "http://localhost:8080" # Base server URL
TEST_IMAGE_URL = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/beignets-task-guide.png"
def fetch_and_prepare_payload():
"""
Fetch the test image and prepare a base64 payload.
"""
try:
img_data = urlopen(TEST_IMAGE_URL).read()
return base64.b64encode(img_data).decode("utf-8")
except Exception as e:
print(f"Error fetching the image: {e}")
return None
def send_request(payload, batch=False):
"""
Send a single or batch request and measure response time.
"""
start_time = time.time()
endpoint = f"{SERVER_URL}/predict"
try:
if batch:
response = requests.post(endpoint, json=[{"image": img} for img in payload])
else:
response = requests.post(endpoint, json={"image": payload})
response_time = time.time() - start_time
predictions = response.json() if response.status_code == 200 else None
return response_time, response.status_code, predictions
except Exception as e:
print(f"Error sending request: {e}")
return None, None, None
def get_system_metrics():
"""
Get current CPU and GPU usage.
"""
metrics = {"cpu_usage": psutil.cpu_percent(0.1)}
if GPU_AVAILABLE:
try:
gpu_stats = gpustat.GPUStatCollection.new_query()
metrics["gpu_usage"] = sum([gpu.utilization for gpu in gpu_stats.gpus])
except Exception:
metrics["gpu_usage"] = -1
else:
metrics["gpu_usage"] = -1
return metrics
def benchmark_api(num_requests=100, concurrency_level=10, batch=False):
"""
Benchmark the API server.
"""
payload = fetch_and_prepare_payload()
if not payload:
print("Error preparing payload. Benchmark aborted.")
return
payloads = [payload] * num_requests if batch else [payload]
system_metrics = []
response_times = []
status_codes = []
predictions = []
# Start benchmark timer
start_benchmark_time = time.time()
with concurrent.futures.ThreadPoolExecutor(
max_workers=concurrency_level
) as executor:
futures = [
executor.submit(send_request, payloads if batch else payload, batch)
for _ in range(num_requests)
]
while any(not f.done() for f in futures):
system_metrics.append(get_system_metrics())
time.sleep(0.1)
for future in futures:
result = future.result()
if result:
response_time, status_code, prediction = result
response_times.append(response_time)
status_codes.append(status_code)
predictions.append(prediction)
# Stop benchmark timer
total_benchmark_time = time.time() - start_benchmark_time
avg_cpu = np.mean([m["cpu_usage"] for m in system_metrics])
avg_gpu = np.mean([m["gpu_usage"] for m in system_metrics]) if GPU_AVAILABLE else -1
success_rate = (status_codes.count(200) / num_requests) * 100 if status_codes else 0
avg_response_time = np.mean(response_times) * 1000 if response_times else 0 # ms
requests_per_second = num_requests / total_benchmark_time
print("\n--- Sample Predictions ---")
for i, prediction in enumerate(
predictions[:5]
): # Show predictions for the first 5 requests
print(f"Request {i + 1}: {prediction}")
return {
"total_requests": num_requests,
"concurrency_level": concurrency_level,
"total_time": total_benchmark_time,
"avg_response_time": avg_response_time,
"success_rate": success_rate,
"requests_per_second": requests_per_second,
"avg_cpu_usage": avg_cpu,
"avg_gpu_usage": avg_gpu,
}
def run_benchmarks():
"""
Run comprehensive benchmarks and create plots.
"""
concurrency_levels = [1, 8, 16, 32]
metrics = []
print("Running API benchmarks...")
for concurrency in concurrency_levels:
print(f"\nTesting concurrency level: {concurrency}")
result = benchmark_api(
num_requests=50, concurrency_level=concurrency, batch=False
)
if result:
metrics.append(result)
print(
f"Concurrency {concurrency}: "
f"{result['requests_per_second']:.2f} reqs/sec, "
f"CPU: {result['avg_cpu_usage']:.1f}%, "
f"GPU: {result['avg_gpu_usage']:.1f}%"
)
# Generate plots
plt.figure(figsize=(12, 6))
# Throughput
plt.subplot(1, 2, 1)
plt.plot(
concurrency_levels,
[m["requests_per_second"] for m in metrics],
"r-o",
label="Throughput",
)
plt.xlabel("Concurrency Level")
plt.ylabel("Requests per Second")
plt.title("API Throughput")
plt.grid(True)
# Resource Usage
plt.subplot(1, 2, 2)
plt.plot(
concurrency_levels,
[m["avg_cpu_usage"] for m in metrics],
"b-o",
label="CPU Usage",
)
if GPU_AVAILABLE:
plt.plot(
concurrency_levels,
[m["avg_gpu_usage"] for m in metrics],
"g-o",
label="GPU Usage",
)
plt.xlabel("Concurrency Level")
plt.ylabel("Resource Usage (%)")
plt.title("Resource Usage")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("benchmark_results.png")
print("Benchmark results saved as 'benchmark_results.png'.")
if __name__ == "__main__":
run_benchmarks()
|