Spaces:
Sleeping
Sleeping
import functools | |
import time | |
from groq import RateLimitError | |
from httpx import ReadTimeout | |
from loguru import logger | |
def rate_limit_bypass(sleep_time: int = 1, max_retries: int = 10): | |
"""Bypass rate limit for groq | |
Parameters | |
---------- | |
sleep_time : int, optional, default 1 | |
max_retries : int, optional, default 10""" | |
def decorate_rate_limit(func): | |
def wrapper_rate_limit(*args, **kwargs): | |
retries = 0 | |
while True: | |
try: | |
result = func(*args, **kwargs) | |
except (RateLimitError, ReadTimeout) as e: | |
logger.info(f"Rate limit exceeded, sleeping for {sleep_time} seconds") | |
logger.debug(repr(e)) | |
time.sleep(sleep_time) | |
retries += 1 | |
if retries > max_retries: | |
raise e | |
continue | |
return result | |
return wrapper_rate_limit | |
return decorate_rate_limit | |