clean_messages.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import datetime
  2. import time
  3. import click
  4. from werkzeug.exceptions import NotFound
  5. import app
  6. from configs import dify_config
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.model import (
  10. App,
  11. Message,
  12. MessageAgentThought,
  13. MessageAnnotation,
  14. MessageChain,
  15. MessageFeedback,
  16. MessageFile,
  17. )
  18. from models.web import SavedMessage
  19. from services.feature_service import FeatureService
  20. @app.celery.task(queue="dataset")
  21. def clean_messages():
  22. click.echo(click.style("Start clean messages.", fg="green"))
  23. start_at = time.perf_counter()
  24. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  25. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  26. )
  27. page = 1
  28. while True:
  29. try:
  30. # Main query with join and filter
  31. # FIXME:for mypy no paginate method error
  32. messages = (
  33. db.session.query(Message) # type: ignore
  34. .filter(Message.created_at < plan_sandbox_clean_message_day)
  35. .order_by(Message.created_at.desc())
  36. .limit(100)
  37. .all()
  38. )
  39. except NotFound:
  40. break
  41. if not messages:
  42. break
  43. for message in messages:
  44. plan_sandbox_clean_message_day = message.created_at
  45. app = App.query.filter_by(id=message.app_id).first()
  46. features_cache_key = f"features:{app.tenant_id}"
  47. plan_cache = redis_client.get(features_cache_key)
  48. if plan_cache is None:
  49. features = FeatureService.get_features(app.tenant_id)
  50. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  51. plan = features.billing.subscription.plan
  52. else:
  53. plan = plan_cache.decode()
  54. if plan == "sandbox":
  55. # clean related message
  56. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  57. synchronize_session=False
  58. )
  59. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  60. synchronize_session=False
  61. )
  62. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  63. synchronize_session=False
  64. )
  65. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  66. synchronize_session=False
  67. )
  68. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  69. synchronize_session=False
  70. )
  71. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  72. synchronize_session=False
  73. )
  74. db.session.query(Message).filter(Message.id == message.id).delete()
  75. db.session.commit()
  76. end_at = time.perf_counter()
  77. click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))