ext_celery.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from datetime import timedelta
  2. from celery import Celery, Task
  3. from flask import Flask
  4. from configs import dify_config
  5. def init_app(app: Flask) -> Celery:
  6. class FlaskTask(Task):
  7. def __call__(self, *args: object, **kwargs: object) -> object:
  8. with app.app_context():
  9. return self.run(*args, **kwargs)
  10. broker_transport_options = {}
  11. if dify_config.CELERY_USE_SENTINEL:
  12. broker_transport_options = {
  13. "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,
  14. "sentinel_kwargs": {
  15. "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,
  16. },
  17. }
  18. celery_app = Celery(
  19. app.name,
  20. task_cls=FlaskTask,
  21. broker=dify_config.CELERY_BROKER_URL,
  22. backend=dify_config.CELERY_BACKEND,
  23. task_ignore_result=True,
  24. )
  25. # Add SSL options to the Celery configuration
  26. ssl_options = {
  27. "ssl_cert_reqs": None,
  28. "ssl_ca_certs": None,
  29. "ssl_certfile": None,
  30. "ssl_keyfile": None,
  31. }
  32. celery_app.conf.update(
  33. result_backend=dify_config.CELERY_RESULT_BACKEND,
  34. broker_transport_options=broker_transport_options,
  35. broker_connection_retry_on_startup=True,
  36. )
  37. if dify_config.BROKER_USE_SSL:
  38. celery_app.conf.update(
  39. broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
  40. )
  41. celery_app.set_default()
  42. app.extensions["celery"] = celery_app
  43. imports = [
  44. "schedule.clean_embedding_cache_task",
  45. "schedule.clean_unused_datasets_task",
  46. ]
  47. day = dify_config.CELERY_BEAT_SCHEDULER_TIME
  48. beat_schedule = {
  49. "clean_embedding_cache_task": {
  50. "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
  51. "schedule": timedelta(days=day),
  52. },
  53. "clean_unused_datasets_task": {
  54. "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
  55. "schedule": timedelta(days=day),
  56. },
  57. }
  58. celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
  59. return celery_app