clear_free_plan_tenant_expired_logs.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. import datetime
  2. import json
  3. import logging
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor
  6. import click
  7. from flask import Flask, current_app
  8. from sqlalchemy.orm import Session
  9. from configs import dify_config
  10. from core.model_runtime.utils.encoders import jsonable_encoder
  11. from extensions.ext_database import db
  12. from extensions.ext_storage import storage
  13. from models.account import Tenant
  14. from models.model import App, Conversation, Message
  15. from models.workflow import WorkflowNodeExecution, WorkflowRun
  16. from services.billing_service import BillingService
  17. logger = logging.getLogger(__name__)
  18. class ClearFreePlanTenantExpiredLogs:
  19. @classmethod
  20. def process_tenant(cls, flask_app: Flask, tenant_id: str, days: int, batch: int):
  21. with flask_app.app_context():
  22. apps = db.session.query(App).filter(App.tenant_id == tenant_id).all()
  23. app_ids = [app.id for app in apps]
  24. while True:
  25. with Session(db.engine).no_autoflush as session:
  26. messages = (
  27. session.query(Message)
  28. .filter(
  29. Message.app_id.in_(app_ids),
  30. Message.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
  31. )
  32. .limit(batch)
  33. .all()
  34. )
  35. if len(messages) == 0:
  36. break
  37. storage.save(
  38. f"free_plan_tenant_expired_logs/"
  39. f"{tenant_id}/messages/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  40. f"-{time.time()}.json",
  41. json.dumps(
  42. jsonable_encoder(
  43. [message.to_dict() for message in messages],
  44. ),
  45. ).encode("utf-8"),
  46. )
  47. message_ids = [message.id for message in messages]
  48. # delete messages
  49. session.query(Message).filter(
  50. Message.id.in_(message_ids),
  51. ).delete(synchronize_session=False)
  52. session.commit()
  53. click.echo(
  54. click.style(
  55. f"[{datetime.datetime.now()}] Processed {len(message_ids)} messages for tenant {tenant_id} "
  56. )
  57. )
  58. while True:
  59. with Session(db.engine).no_autoflush as session:
  60. conversations = (
  61. session.query(Conversation)
  62. .filter(
  63. Conversation.app_id.in_(app_ids),
  64. Conversation.updated_at < datetime.datetime.now() - datetime.timedelta(days=days),
  65. )
  66. .limit(batch)
  67. .all()
  68. )
  69. if len(conversations) == 0:
  70. break
  71. storage.save(
  72. f"free_plan_tenant_expired_logs/"
  73. f"{tenant_id}/conversations/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  74. f"-{time.time()}.json",
  75. json.dumps(
  76. jsonable_encoder(
  77. [conversation.to_dict() for conversation in conversations],
  78. ),
  79. ).encode("utf-8"),
  80. )
  81. conversation_ids = [conversation.id for conversation in conversations]
  82. session.query(Conversation).filter(
  83. Conversation.id.in_(conversation_ids),
  84. ).delete(synchronize_session=False)
  85. session.commit()
  86. click.echo(
  87. click.style(
  88. f"[{datetime.datetime.now()}] Processed {len(conversation_ids)}"
  89. f" conversations for tenant {tenant_id}"
  90. )
  91. )
  92. while True:
  93. with Session(db.engine).no_autoflush as session:
  94. workflow_node_executions = (
  95. session.query(WorkflowNodeExecution)
  96. .filter(
  97. WorkflowNodeExecution.tenant_id == tenant_id,
  98. WorkflowNodeExecution.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
  99. )
  100. .limit(batch)
  101. .all()
  102. )
  103. if len(workflow_node_executions) == 0:
  104. break
  105. # save workflow node executions
  106. storage.save(
  107. f"free_plan_tenant_expired_logs/"
  108. f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  109. f"-{time.time()}.json",
  110. json.dumps(
  111. jsonable_encoder(workflow_node_executions),
  112. ).encode("utf-8"),
  113. )
  114. workflow_node_execution_ids = [
  115. workflow_node_execution.id for workflow_node_execution in workflow_node_executions
  116. ]
  117. # delete workflow node executions
  118. session.query(WorkflowNodeExecution).filter(
  119. WorkflowNodeExecution.id.in_(workflow_node_execution_ids),
  120. ).delete(synchronize_session=False)
  121. session.commit()
  122. click.echo(
  123. click.style(
  124. f"[{datetime.datetime.now()}] Processed {len(workflow_node_execution_ids)}"
  125. f" workflow node executions for tenant {tenant_id}"
  126. )
  127. )
  128. while True:
  129. with Session(db.engine).no_autoflush as session:
  130. workflow_runs = (
  131. session.query(WorkflowRun)
  132. .filter(
  133. WorkflowRun.tenant_id == tenant_id,
  134. WorkflowRun.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
  135. )
  136. .limit(batch)
  137. .all()
  138. )
  139. if len(workflow_runs) == 0:
  140. break
  141. # save workflow runs
  142. storage.save(
  143. f"free_plan_tenant_expired_logs/"
  144. f"{tenant_id}/workflow_runs/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  145. f"-{time.time()}.json",
  146. json.dumps(
  147. jsonable_encoder(
  148. [workflow_run.to_dict() for workflow_run in workflow_runs],
  149. ),
  150. ).encode("utf-8"),
  151. )
  152. workflow_run_ids = [workflow_run.id for workflow_run in workflow_runs]
  153. # delete workflow runs
  154. session.query(WorkflowRun).filter(
  155. WorkflowRun.id.in_(workflow_run_ids),
  156. ).delete(synchronize_session=False)
  157. session.commit()
  158. @classmethod
  159. def process(cls, days: int, batch: int, tenant_ids: list[str]):
  160. """
  161. Clear free plan tenant expired logs.
  162. """
  163. click.echo(click.style("Clearing free plan tenant expired logs", fg="white"))
  164. ended_at = datetime.datetime.now()
  165. started_at = datetime.datetime(2023, 4, 3, 8, 59, 24)
  166. current_time = started_at
  167. with Session(db.engine) as session:
  168. total_tenant_count = session.query(Tenant.id).count()
  169. click.echo(click.style(f"Total tenant count: {total_tenant_count}", fg="white"))
  170. handled_tenant_count = 0
  171. thread_pool = ThreadPoolExecutor(max_workers=10)
  172. def process_tenant(flask_app: Flask, tenant_id: str) -> None:
  173. try:
  174. if (
  175. not dify_config.BILLING_ENABLED
  176. or BillingService.get_info(tenant_id)["subscription"]["plan"] == "sandbox"
  177. ):
  178. # only process sandbox tenant
  179. cls.process_tenant(flask_app, tenant_id, days, batch)
  180. except Exception:
  181. logger.exception(f"Failed to process tenant {tenant_id}")
  182. finally:
  183. nonlocal handled_tenant_count
  184. handled_tenant_count += 1
  185. if handled_tenant_count % 100 == 0:
  186. click.echo(
  187. click.style(
  188. f"[{datetime.datetime.now()}] "
  189. f"Processed {handled_tenant_count} tenants "
  190. f"({(handled_tenant_count / total_tenant_count) * 100:.1f}%), "
  191. f"{handled_tenant_count}/{total_tenant_count}",
  192. fg="green",
  193. )
  194. )
  195. futures = []
  196. if tenant_ids:
  197. for tenant_id in tenant_ids:
  198. futures.append(
  199. thread_pool.submit(
  200. process_tenant,
  201. current_app._get_current_object(), # type: ignore[attr-defined]
  202. tenant_id,
  203. )
  204. )
  205. else:
  206. while current_time < ended_at:
  207. click.echo(
  208. click.style(f"Current time: {current_time}, Started at: {datetime.datetime.now()}", fg="white")
  209. )
  210. # Initial interval of 1 day, will be dynamically adjusted based on tenant count
  211. interval = datetime.timedelta(days=1)
  212. # Process tenants in this batch
  213. with Session(db.engine) as session:
  214. # Calculate tenant count in next batch with current interval
  215. # Try different intervals until we find one with a reasonable tenant count
  216. test_intervals = [
  217. datetime.timedelta(days=1),
  218. datetime.timedelta(hours=12),
  219. datetime.timedelta(hours=6),
  220. datetime.timedelta(hours=3),
  221. datetime.timedelta(hours=1),
  222. ]
  223. for test_interval in test_intervals:
  224. tenant_count = (
  225. session.query(Tenant.id)
  226. .filter(Tenant.created_at.between(current_time, current_time + test_interval))
  227. .count()
  228. )
  229. if tenant_count <= 100:
  230. interval = test_interval
  231. break
  232. else:
  233. # If all intervals have too many tenants, use minimum interval
  234. interval = datetime.timedelta(hours=1)
  235. # Adjust interval to target ~100 tenants per batch
  236. if tenant_count > 0:
  237. # Scale interval based on ratio to target count
  238. interval = min(
  239. datetime.timedelta(days=1), # Max 1 day
  240. max(
  241. datetime.timedelta(hours=1), # Min 1 hour
  242. interval * (100 / tenant_count), # Scale to target 100
  243. ),
  244. )
  245. batch_end = min(current_time + interval, ended_at)
  246. rs = (
  247. session.query(Tenant.id)
  248. .filter(Tenant.created_at.between(current_time, batch_end))
  249. .order_by(Tenant.created_at)
  250. )
  251. tenants = []
  252. for row in rs:
  253. tenant_id = str(row.id)
  254. try:
  255. tenants.append(tenant_id)
  256. except Exception:
  257. logger.exception(f"Failed to process tenant {tenant_id}")
  258. continue
  259. futures.append(
  260. thread_pool.submit(
  261. process_tenant,
  262. current_app._get_current_object(), # type: ignore[attr-defined]
  263. tenant_id,
  264. )
  265. )
  266. current_time = batch_end
  267. # wait for all threads to finish
  268. for future in futures:
  269. future.result()