clean_messages.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import datetime
  2. import time
  3. import click
  4. from werkzeug.exceptions import NotFound
  5. import app
  6. from configs import dify_config
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.model import (
  10. App,
  11. Message,
  12. MessageAgentThought,
  13. MessageAnnotation,
  14. MessageChain,
  15. MessageFeedback,
  16. MessageFile,
  17. )
  18. from models.web import SavedMessage
  19. from services.feature_service import FeatureService
  20. @app.celery.task(queue="dataset")
  21. def clean_messages():
  22. click.echo(click.style("Start clean messages.", fg="green"))
  23. start_at = time.perf_counter()
  24. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  25. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  26. )
  27. while True:
  28. try:
  29. # Main query with join and filter
  30. # FIXME:for mypy no paginate method error
  31. messages = (
  32. db.session.query(Message) # type: ignore
  33. .filter(Message.created_at < plan_sandbox_clean_message_day)
  34. .order_by(Message.created_at.desc())
  35. .limit(100)
  36. .all()
  37. )
  38. except NotFound:
  39. break
  40. if not messages:
  41. break
  42. for message in messages:
  43. plan_sandbox_clean_message_day = message.created_at
  44. app = App.query.filter_by(id=message.app_id).first()
  45. features_cache_key = f"features:{app.tenant_id}"
  46. plan_cache = redis_client.get(features_cache_key)
  47. if plan_cache is None:
  48. features = FeatureService.get_features(app.tenant_id)
  49. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  50. plan = features.billing.subscription.plan
  51. else:
  52. plan = plan_cache.decode()
  53. if plan == "sandbox":
  54. # clean related message
  55. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  56. synchronize_session=False
  57. )
  58. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  59. synchronize_session=False
  60. )
  61. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  62. synchronize_session=False
  63. )
  64. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  65. synchronize_session=False
  66. )
  67. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  68. synchronize_session=False
  69. )
  70. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  71. synchronize_session=False
  72. )
  73. db.session.query(Message).filter(Message.id == message.id).delete()
  74. db.session.commit()
  75. end_at = time.perf_counter()
  76. click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))