| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | from datetime import timedeltaimport pytzfrom celery import Celery, Taskfrom celery.schedules import crontabfrom flask import Flaskfrom configs import dify_configdef init_app(app: Flask) -> Celery:    class FlaskTask(Task):        def __call__(self, *args: object, **kwargs: object) -> object:            with app.app_context():                return self.run(*args, **kwargs)    broker_transport_options = {}    if dify_config.CELERY_USE_SENTINEL:        broker_transport_options = {            "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,            "sentinel_kwargs": {                "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,            },        }    celery_app = Celery(        app.name,        task_cls=FlaskTask,        broker=dify_config.CELERY_BROKER_URL,        backend=dify_config.CELERY_BACKEND,        task_ignore_result=True,    )    # Add SSL options to the Celery configuration    ssl_options = {        "ssl_cert_reqs": None,        "ssl_ca_certs": None,        "ssl_certfile": None,        "ssl_keyfile": None,    }    celery_app.conf.update(        result_backend=dify_config.CELERY_RESULT_BACKEND,        broker_transport_options=broker_transport_options,        broker_connection_retry_on_startup=True,        worker_log_format=dify_config.LOG_FORMAT,        worker_task_log_format=dify_config.LOG_FORMAT,        worker_hijack_root_logger=False,        timezone=pytz.timezone(dify_config.LOG_TZ),    )    if dify_config.BROKER_USE_SSL:        celery_app.conf.update(            broker_use_ssl=ssl_options,  # Add the SSL options to the broker configuration        )    if dify_config.LOG_FILE:        celery_app.conf.update(            worker_logfile=dify_config.LOG_FILE,        )    celery_app.set_default()    app.extensions["celery"] = celery_app    imports = [        "schedule.clean_embedding_cache_task",        "schedule.clean_unused_datasets_task",        "schedule.create_tidb_serverless_task",        "schedule.update_tidb_serverless_status_task",    ]    day = dify_config.CELERY_BEAT_SCHEDULER_TIME    beat_schedule = {        "clean_embedding_cache_task": {            "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",            "schedule": timedelta(days=day),        },        "clean_unused_datasets_task": {            "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",            "schedule": timedelta(days=day),        },        "create_tidb_serverless_task": {            "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task",            "schedule": crontab(minute="0", hour="*"),        },        "update_tidb_serverless_status_task": {            "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",            "schedule": crontab(minute="30", hour="*"),        },    }    celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)    return celery_app
 |