OmAgent / app.py
韩宇
init req
c6e903b
raw
history blame
4.91 kB
# Import required modules and components
import base64
import hashlib
import json
import os
from pathlib import Path
from Crypto.Cipher import AES
class Encrypt(object):
@staticmethod
def pad(s):
AES_BLOCK_SIZE = 16 # Bytes
return s + (AES_BLOCK_SIZE - len(s) % AES_BLOCK_SIZE) * \
chr(AES_BLOCK_SIZE - len(s) % AES_BLOCK_SIZE)
@staticmethod
def unpad(s):
return s[:-ord(s[len(s) - 1:])]
# hashlib md5加密
@staticmethod
def hash_md5_encrypt(data: (str, bytes), salt=None) -> str:
if isinstance(data, str):
data = data.encode('utf-8')
md5 = hashlib.md5()
if salt:
if isinstance(salt, str):
salt = salt.encode('utf-8')
md5.update(salt)
md5.update(data)
return md5.hexdigest()
@staticmethod
# @catch_exc()
def aes_decrypt(key: str, data: str) -> str:
'''
:param key: 密钥
:param data: 加密后的数据(密文)
:return:明文
'''
key = key.encode('utf8')
data = base64.b64decode(data)
cipher = AES.new(key, AES.MODE_ECB)
# 去补位
text_decrypted = Encrypt.unpad(cipher.decrypt(data))
text_decrypted = text_decrypted.decode('utf8')
return text_decrypted
secret = 'FwALd7BY8IUrbnrigH3YYlhGD/XvMVX7'
encrypt = 'sJWveD1LIxIxYGZvZMRlb+8vJjq5yJmXnqSKfHM6Ahi0Olw0EVkJNY3I4B5boUjPbaDtAoF7X8V5vHmdOFr6q7tVZL1xLkoZ/IDX5XHsewaG91ipkITGxEdER1vXWBYvNrs3WqSMWi1QXzHsKDThXkeBAHibjjsNPLOD6c+UptqzPsll+/chUFwJeMvxJ7dnVMWShkOfiVi3BYhavhLSFwq3Y/zQae27f8Cqufqd+bWXr1sLhPg38EMtaM+TK2W7qCRZs4XdNsUOA3lbHQKW7iKC0hRtqrSSWuAxorCwrvdiCXI8HeS6N3RNGUPVILQm9uR8bE3ruMU2PFs/h8Gk6rQE3VrcEhkZtw4QD0+wIqc='
env = json.loads(Encrypt.aes_decrypt(secret, encrypt))
print(env)
for k, v in env.items():
os.environ.setdefault(k, v)
from agent.conclude.webpage_conclude import WebpageConclude
from agent.video_preprocessor.webpage_vp import WebpageVideoPreprocessor
from agent.video_qa.webpage_qa import WebpageVideoQA
from webpage import WebpageClient
from omagent_core.advanced_components.workflow.dnc.workflow import DnCWorkflow
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from omagent_core.utils.container import container
from omagent_core.utils.logger import logging
from omagent_core.utils.registry import registry
def app():
logging.init_logger("omagent", "omagent", level="INFO")
# Set current working directory path
CURRENT_PATH = root_path = Path(__file__).parents[0]
# Import registered modules
registry.import_module(project_path=CURRENT_PATH.joinpath("agent"))
# Load container configuration from YAML file
container.register_stm("SharedMemSTM")
container.register_ltm(ltm="VideoMilvusLTM")
container.from_config(CURRENT_PATH.joinpath("container.yaml"))
# Initialize simple VQA workflow
workflow = ConductorWorkflow(name="webpage_video_understanding")
process_workflow = ConductorWorkflow(name="webpage_process_video_understanding")
# 1. Video preprocess task for video preprocessing
video_preprocess_task = simple_task(
task_def_name=WebpageVideoPreprocessor,
task_reference_name="webpage_video_preprocess",
inputs={"video_path": process_workflow.input("video_path")}
)
# 2. Video QA task for video QA
video_qa_task = simple_task(
task_def_name=WebpageVideoQA,
task_reference_name="webpage_video_qa",
inputs={
"video_md5": workflow.input("video_md5"),
"video_path": workflow.input("video_path"),
"instance_id": workflow.input("instance_id"),
"question": workflow.input("question"),
},
)
dnc_workflow = DnCWorkflow()
dnc_workflow.set_input(query=video_qa_task.output("query"))
# 7. Conclude task for task conclusion
conclude_task = simple_task(
task_def_name=WebpageConclude,
task_reference_name="webpage_task_conclude",
inputs={
"dnc_structure": dnc_workflow.dnc_structure,
"last_output": dnc_workflow.last_output,
},
)
# Configure workflow execution flow: Input -> Initialize global variables -> DnC Loop -> Conclude
process_workflow >> video_preprocess_task
workflow >> video_preprocess_task >> video_qa_task >> dnc_workflow >> conclude_task
# Register workflow
workflow.register(overwrite=True)
process_workflow.register(overwrite=True)
# Initialize and start app client with workflow configuration
cli_client = WebpageClient(
interactor=workflow, processor=process_workflow, config_path="webpage_configs"
)
cli_client.start_interactor()
if __name__ == '__main__':
app()