clean_embedding_cache_task.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import datetime
  2. import time
  3. import click
  4. from sqlalchemy import text
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from models.dataset import Embedding
  10. @app.celery.task(queue='dataset')
  11. def clean_embedding_cache_task():
  12. click.echo(click.style('Start clean embedding cache.', fg='green'))
  13. clean_days = int(dify_config.CLEAN_DAY_SETTING)
  14. start_at = time.perf_counter()
  15. thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
  16. while True:
  17. try:
  18. embedding_ids = db.session.query(Embedding.id).filter(Embedding.created_at < thirty_days_ago) \
  19. .order_by(Embedding.created_at.desc()).limit(100).all()
  20. embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
  21. except NotFound:
  22. break
  23. if embedding_ids:
  24. for embedding_id in embedding_ids:
  25. db.session.execute(text(
  26. "DELETE FROM embeddings WHERE id = :embedding_id"
  27. ), {'embedding_id': embedding_id})
  28. db.session.commit()
  29. else:
  30. break
  31. end_at = time.perf_counter()
  32. click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green'))