import logging from typing import Optional from pydantic import BaseModel from obsei.analyzer.base_analyzer import BaseAnalyzer, BaseAnalyzerConfig from obsei.sink.base_sink import BaseSink, BaseSinkConfig from obsei.source.base_source import BaseSource, BaseSourceConfig from obsei.workflow.workflow import Workflow logger = logging.getLogger(__name__) class Processor(BaseModel): analyzer: BaseAnalyzer analyzer_config: Optional[BaseAnalyzerConfig] = None source: Optional[BaseSource] = None source_config: Optional[BaseSourceConfig] = None sink: Optional[BaseSink] = None sink_config: Optional[BaseSinkConfig] = None def process( self, workflow: Optional[Workflow] = None, source: Optional[BaseSource] = None, source_config: Optional[BaseSourceConfig] = None, sink: Optional[BaseSink] = None, sink_config: Optional[BaseSinkConfig] = None, analyzer: Optional[BaseAnalyzer] = None, analyzer_config: Optional[BaseAnalyzerConfig] = None, ) -> None: source = source or self.source sink = sink or self.sink analyzer = analyzer or self.analyzer id: Optional[str] = None if workflow: sink_config = workflow.config.sink_config source_config = workflow.config.source_config analyzer_config = workflow.config.analyzer_config id = workflow.id else: sink_config = sink_config or self.sink_config source_config = source_config or self.source_config analyzer_config = analyzer_config or self.analyzer_config if source is None or source_config is None: return if sink is None or sink_config is None: return source_response_list = source.lookup(config=source_config, id=id) for idx, source_response in enumerate(source_response_list): logger.info(f"source_response#'{idx}'='{source_response}'") analyzer_response_list = analyzer.analyze_input( source_response_list=source_response_list, analyzer_config=analyzer_config, id=id, ) for idx, analyzer_response in enumerate(analyzer_response_list): logger.info(f"source_response#'{idx}'='{analyzer_response}'") sink_response_list = sink.send_data( analyzer_responses=analyzer_response_list, config=sink_config, id=id ) for idx, sink_response in enumerate(sink_response_list): logger.info(f"source_response#'{idx}'='{sink_response}'")