clean_messages.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import datetime
  2. import time
  3. import click
  4. from werkzeug.exceptions import NotFound
  5. import app
  6. from configs import dify_config
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.model import (
  10. App,
  11. Message,
  12. MessageAgentThought,
  13. MessageAnnotation,
  14. MessageChain,
  15. MessageFeedback,
  16. MessageFile,
  17. )
  18. from models.web import SavedMessage
  19. from services.feature_service import FeatureService
  20. @app.celery.task(queue="dataset")
  21. def clean_messages():
  22. click.echo(click.style("Start clean messages.", fg="green"))
  23. start_at = time.perf_counter()
  24. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  25. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  26. )
  27. page = 1
  28. while True:
  29. try:
  30. # Main query with join and filter
  31. messages = (
  32. db.session.query(Message)
  33. .filter(Message.created_at < plan_sandbox_clean_message_day)
  34. .order_by(Message.created_at.desc())
  35. .paginate(page=page, per_page=100)
  36. )
  37. except NotFound:
  38. break
  39. if messages.items is None or len(messages.items) == 0:
  40. break
  41. for message in messages.items:
  42. app = App.query.filter_by(id=message.app_id).first()
  43. features_cache_key = f"features:{app.tenant_id}"
  44. plan_cache = redis_client.get(features_cache_key)
  45. if plan_cache is None:
  46. features = FeatureService.get_features(app.tenant_id)
  47. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  48. plan = features.billing.subscription.plan
  49. else:
  50. plan = plan_cache.decode()
  51. if plan == "sandbox":
  52. # clean related message
  53. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  54. synchronize_session=False
  55. )
  56. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  57. synchronize_session=False
  58. )
  59. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  60. synchronize_session=False
  61. )
  62. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  63. synchronize_session=False
  64. )
  65. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  66. synchronize_session=False
  67. )
  68. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  69. synchronize_session=False
  70. )
  71. db.session.query(Message).filter(Message.id == message.id).delete()
  72. db.session.commit()
  73. end_at = time.perf_counter()
  74. click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))