Spaces:
Runtime error
Runtime error
import os | |
import time | |
import logging | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from confluent_kafka import KafkaException, TopicPartition, Producer, Consumer | |
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer | |
from confluent_kafka.serialization import MessageField, SerializationContext | |
from aitask import handle_message, TooManyRequestsError | |
from schemaregistry import SchemaClient | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configuration | |
schema_registry_url = os.getenv("SCHEMA_REGISTRY_URL") | |
kafka_domain = os.getenv('DOMAIN') | |
password = os.getenv('PASSWORD') | |
conf = { | |
'bootstrap.servers': f"{kafka_domain}:29092", | |
'security.protocol': 'SASL_PLAINTEXT', | |
'sasl.mechanism': 'PLAIN', | |
'sasl.username': "dathuynh", | |
'sasl.password': password, | |
} | |
# Shutdown flag | |
shutdown_event = threading.Event() | |
def avro_deserializer(): | |
schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scan") | |
schema_str = schema_client.get_schema_str() | |
if schema_str is None: | |
raise RuntimeError("Failed to fetch schema for MessageResponse") | |
schema_registry_client = schema_client.schema_registry_client | |
return AvroDeserializer(schema_registry_client, schema_str) | |
def avro_serializer(): | |
schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scandetail") | |
schema_str = schema_client.get_schema_str() | |
if schema_str is None: | |
raise RuntimeError("Failed to fetch schema for MessageResponse") | |
schema_registry_client = schema_client.schema_registry_client | |
return AvroSerializer(schema_registry_client, schema_str) | |
def create_consumer(group_id): | |
consumer_conf = { | |
**conf, | |
'group.id': group_id, | |
'auto.offset.reset': 'latest', | |
'session.timeout.ms': 60000, | |
'heartbeat.interval.ms': 3000, | |
'enable.auto.commit': False, # Manual commit | |
'log_level': 4 | |
} | |
return Consumer(**consumer_conf) | |
def create_producer(): | |
producer_conf = { | |
**conf, | |
'linger.ms': 10, | |
'batch.num.messages': 1000, | |
'queue.buffering.max.ms': 1000 | |
} | |
return Producer(**producer_conf) | |
# Create producer instance | |
producer = create_producer() | |
def ensure_producer_connected(producer): | |
retries = 5 | |
for attempt in range(retries): | |
try: | |
producer.list_topics(timeout=5) | |
break | |
except KafkaException as e: | |
if attempt < retries - 1: | |
time.sleep(5) | |
else: | |
logger.error("Max retries reached. Could not establish a producer connection.") | |
raise e | |
def decode_message(msg, avro_deserializer, topic): | |
try: | |
byte_message = msg.value() | |
return avro_deserializer(byte_message, SerializationContext(topic, MessageField.VALUE)) | |
except Exception as e: | |
logger.error(f"Error decoding message: {e}") | |
return None | |
def kafka_consumer(group_id, topic): | |
consumer = create_consumer(group_id) | |
consumer.subscribe([topic]) | |
deserializer = avro_deserializer() | |
serializer = avro_serializer() | |
logger.info(f"Consumer {group_id} is running. Waiting for messages on topic {topic}...") | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
shutdown_timer = threading.Timer(14400, shutdown_event.set) # Set to shutdown after 4 hours | |
shutdown_timer.start() | |
while not shutdown_event.is_set(): | |
try: | |
msgs = consumer.consume(num_messages=10, timeout=1.0) | |
if not msgs: | |
continue | |
futures = [ | |
executor.submit( | |
handle_message, | |
decode_message(msg, deserializer, topic), | |
producer, | |
ensure_producer_connected, | |
serializer | |
) for msg in msgs if decode_message(msg, deserializer, topic) is not None | |
] | |
for future in as_completed(futures): | |
try: | |
future.result() | |
except Exception as e: | |
if isinstance(e, TooManyRequestsError): | |
partition = msg.partition() | |
consumer.pause([TopicPartition(topic, partition)]) | |
logger.info(f"Paused partition {partition} due to TooManyRequestsError") | |
handle_retry(consumer, topic, partition, e.retry_after) | |
else: | |
logger.error(f"Error processing message: {e}") | |
raise e | |
except KafkaException as e: | |
logger.error(f"Kafka exception: {e}. Restarting consumer loop...") | |
time.sleep(5) | |
except KeyboardInterrupt: | |
logger.info("Consumer interrupted. Exiting...") | |
shutdown_event.set() | |
shutdown_timer.cancel() | |
consumer.close() | |
def handle_retry(consumer, topic, partition, retry_after): | |
time.sleep(retry_after) | |
consumer.resume([TopicPartition(topic, partition)]) | |
def start_kafka_consumer_thread(group_id, topic): | |
consumer_thread = threading.Thread(target=kafka_consumer, args=(group_id, topic)) | |
consumer_thread.daemon = True | |
consumer_thread.start() | |
return consumer_thread |