CyberEndE / aimessage.py
admincybers2's picture
Create aimessage.py
9b801f7 verified
import os
import time
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from confluent_kafka import KafkaException, TopicPartition, Producer, Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
from aitask import handle_message, TooManyRequestsError
from schemaregistry import SchemaClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
schema_registry_url = os.getenv("SCHEMA_REGISTRY_URL")
kafka_domain = os.getenv('DOMAIN')
password = os.getenv('PASSWORD')
conf = {
'bootstrap.servers': f"{kafka_domain}:29092",
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'PLAIN',
'sasl.username': "dathuynh",
'sasl.password': password,
}
# Shutdown flag
shutdown_event = threading.Event()
def avro_deserializer():
schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scan")
schema_str = schema_client.get_schema_str()
if schema_str is None:
raise RuntimeError("Failed to fetch schema for MessageResponse")
schema_registry_client = schema_client.schema_registry_client
return AvroDeserializer(schema_registry_client, schema_str)
def avro_serializer():
schema_client = SchemaClient(schema_registry_url, "cybersentinal.avro.scandetail")
schema_str = schema_client.get_schema_str()
if schema_str is None:
raise RuntimeError("Failed to fetch schema for MessageResponse")
schema_registry_client = schema_client.schema_registry_client
return AvroSerializer(schema_registry_client, schema_str)
def create_consumer(group_id):
consumer_conf = {
**conf,
'group.id': group_id,
'auto.offset.reset': 'latest',
'session.timeout.ms': 60000,
'heartbeat.interval.ms': 3000,
'enable.auto.commit': False, # Manual commit
'log_level': 4
}
return Consumer(**consumer_conf)
def create_producer():
producer_conf = {
**conf,
'linger.ms': 10,
'batch.num.messages': 1000,
'queue.buffering.max.ms': 1000
}
return Producer(**producer_conf)
# Create producer instance
producer = create_producer()
def ensure_producer_connected(producer):
retries = 5
for attempt in range(retries):
try:
producer.list_topics(timeout=5)
break
except KafkaException as e:
if attempt < retries - 1:
time.sleep(5)
else:
logger.error("Max retries reached. Could not establish a producer connection.")
raise e
def decode_message(msg, avro_deserializer, topic):
try:
byte_message = msg.value()
return avro_deserializer(byte_message, SerializationContext(topic, MessageField.VALUE))
except Exception as e:
logger.error(f"Error decoding message: {e}")
return None
def kafka_consumer(group_id, topic):
consumer = create_consumer(group_id)
consumer.subscribe([topic])
deserializer = avro_deserializer()
serializer = avro_serializer()
logger.info(f"Consumer {group_id} is running. Waiting for messages on topic {topic}...")
with ThreadPoolExecutor(max_workers=10) as executor:
shutdown_timer = threading.Timer(14400, shutdown_event.set) # Set to shutdown after 4 hours
shutdown_timer.start()
while not shutdown_event.is_set():
try:
msgs = consumer.consume(num_messages=10, timeout=1.0)
if not msgs:
continue
futures = [
executor.submit(
handle_message,
decode_message(msg, deserializer, topic),
producer,
ensure_producer_connected,
serializer
) for msg in msgs if decode_message(msg, deserializer, topic) is not None
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
if isinstance(e, TooManyRequestsError):
partition = msg.partition()
consumer.pause([TopicPartition(topic, partition)])
logger.info(f"Paused partition {partition} due to TooManyRequestsError")
handle_retry(consumer, topic, partition, e.retry_after)
else:
logger.error(f"Error processing message: {e}")
raise e
except KafkaException as e:
logger.error(f"Kafka exception: {e}. Restarting consumer loop...")
time.sleep(5)
except KeyboardInterrupt:
logger.info("Consumer interrupted. Exiting...")
shutdown_event.set()
shutdown_timer.cancel()
consumer.close()
def handle_retry(consumer, topic, partition, retry_after):
time.sleep(retry_after)
consumer.resume([TopicPartition(topic, partition)])
def start_kafka_consumer_thread(group_id, topic):
consumer_thread = threading.Thread(target=kafka_consumer, args=(group_id, topic))
consumer_thread.daemon = True
consumer_thread.start()
return consumer_thread