|
from datetime import timedelta |
|
|
|
from celery import Celery, Task |
|
from celery.schedules import crontab |
|
from flask import Flask |
|
|
|
from configs import dify_config |
|
|
|
|
|
def init_app(app: Flask) -> Celery: |
|
class FlaskTask(Task): |
|
def __call__(self, *args: object, **kwargs: object) -> object: |
|
with app.app_context(): |
|
return self.run(*args, **kwargs) |
|
|
|
broker_transport_options = {} |
|
|
|
if dify_config.CELERY_USE_SENTINEL: |
|
broker_transport_options = { |
|
"master_name": dify_config.CELERY_SENTINEL_MASTER_NAME, |
|
"sentinel_kwargs": { |
|
"socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT, |
|
}, |
|
} |
|
|
|
celery_app = Celery( |
|
app.name, |
|
task_cls=FlaskTask, |
|
broker=dify_config.CELERY_BROKER_URL, |
|
backend=dify_config.CELERY_BACKEND, |
|
task_ignore_result=True, |
|
) |
|
|
|
|
|
ssl_options = { |
|
"ssl_cert_reqs": None, |
|
"ssl_ca_certs": None, |
|
"ssl_certfile": None, |
|
"ssl_keyfile": None, |
|
} |
|
|
|
celery_app.conf.update( |
|
result_backend=dify_config.CELERY_RESULT_BACKEND, |
|
broker_transport_options=broker_transport_options, |
|
broker_connection_retry_on_startup=True, |
|
) |
|
|
|
if dify_config.BROKER_USE_SSL: |
|
celery_app.conf.update( |
|
broker_use_ssl=ssl_options, |
|
) |
|
|
|
celery_app.set_default() |
|
app.extensions["celery"] = celery_app |
|
|
|
imports = [ |
|
"schedule.clean_embedding_cache_task", |
|
"schedule.clean_unused_datasets_task", |
|
"schedule.create_tidb_serverless_task", |
|
"schedule.update_tidb_serverless_status_task", |
|
] |
|
day = dify_config.CELERY_BEAT_SCHEDULER_TIME |
|
beat_schedule = { |
|
"clean_embedding_cache_task": { |
|
"task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", |
|
"schedule": timedelta(days=day), |
|
}, |
|
"clean_unused_datasets_task": { |
|
"task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", |
|
"schedule": timedelta(days=day), |
|
}, |
|
"create_tidb_serverless_task": { |
|
"task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", |
|
"schedule": crontab(minute="0", hour="*"), |
|
}, |
|
"update_tidb_serverless_status_task": { |
|
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", |
|
"schedule": crontab(minute="30", hour="*"), |
|
}, |
|
} |
|
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) |
|
|
|
return celery_app |
|
|