ops_trace_manager.py 30 KB


  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from typing import Any, Optional, Union
  9. from uuid import UUID, uuid4
  10. from flask import current_app
  11. from sqlalchemy import select
  12. from sqlalchemy.orm import Session
  13. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  14. from core.ops.entities.config_entity import (
  15. OPS_FILE_PATH,
  16. LangfuseConfig,
  17. LangSmithConfig,
  18. TracingProviderEnum,
  19. )
  20. from core.ops.entities.trace_entity import (
  21. DatasetRetrievalTraceInfo,
  22. GenerateNameTraceInfo,
  23. MessageTraceInfo,
  24. ModerationTraceInfo,
  25. SuggestedQuestionTraceInfo,
  26. TaskData,
  27. ToolTraceInfo,
  28. TraceTaskName,
  29. WorkflowTraceInfo,
  30. )
  31. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  32. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  33. from core.ops.utils import get_message_data
  34. from extensions.ext_database import db
  35. from extensions.ext_storage import storage
  36. from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
  37. from models.workflow import WorkflowAppLog, WorkflowRun
  38. from tasks.ops_trace_task import process_trace_tasks
  39. provider_config_map: dict[str, dict[str, Any]] = {
  40. TracingProviderEnum.LANGFUSE.value: {
  41. "config_class": LangfuseConfig,
  42. "secret_keys": ["public_key", "secret_key"],
  43. "other_keys": ["host", "project_key"],
  44. "trace_instance": LangFuseDataTrace,
  45. },
  46. TracingProviderEnum.LANGSMITH.value: {
  47. "config_class": LangSmithConfig,
  48. "secret_keys": ["api_key"],
  49. "other_keys": ["project", "endpoint"],
  50. "trace_instance": LangSmithDataTrace,
  51. },
  52. }
  53. class OpsTraceManager:
  54. @classmethod
  55. def encrypt_tracing_config(
  56. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  57. ):
  58. """
  59. Encrypt tracing config.
  60. :param tenant_id: tenant id
  61. :param tracing_provider: tracing provider
  62. :param tracing_config: tracing config dictionary to be encrypted
  63. :param current_trace_config: current tracing configuration for keeping existing values
  64. :return: encrypted tracing configuration
  65. """
  66. # Get the configuration class and the keys that require encryption
  67. config_class, secret_keys, other_keys = (
  68. provider_config_map[tracing_provider]["config_class"],
  69. provider_config_map[tracing_provider]["secret_keys"],
  70. provider_config_map[tracing_provider]["other_keys"],
  71. )
  72. new_config = {}
  73. # Encrypt necessary keys
  74. for key in secret_keys:
  75. if key in tracing_config:
  76. if "*" in tracing_config[key]:
  77. # If the key contains '*', retain the original value from the current config
  78. new_config[key] = current_trace_config.get(key, tracing_config[key])
  79. else:
  80. # Otherwise, encrypt the key
  81. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  82. for key in other_keys:
  83. new_config[key] = tracing_config.get(key, "")
  84. # Create a new instance of the config class with the new configuration
  85. encrypted_config = config_class(**new_config)
  86. return encrypted_config.model_dump()
  87. @classmethod
  88. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  89. """
  90. Decrypt tracing config
  91. :param tenant_id: tenant id
  92. :param tracing_provider: tracing provider
  93. :param tracing_config: tracing config
  94. :return:
  95. """
  96. config_class, secret_keys, other_keys = (
  97. provider_config_map[tracing_provider]["config_class"],
  98. provider_config_map[tracing_provider]["secret_keys"],
  99. provider_config_map[tracing_provider]["other_keys"],
  100. )
  101. new_config = {}
  102. for key in secret_keys:
  103. if key in tracing_config:
  104. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  105. for key in other_keys:
  106. new_config[key] = tracing_config.get(key, "")
  107. return config_class(**new_config).model_dump()
  108. @classmethod
  109. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  110. """
  111. Decrypt tracing config
  112. :param tracing_provider: tracing provider
  113. :param decrypt_tracing_config: tracing config
  114. :return:
  115. """
  116. config_class, secret_keys, other_keys = (
  117. provider_config_map[tracing_provider]["config_class"],
  118. provider_config_map[tracing_provider]["secret_keys"],
  119. provider_config_map[tracing_provider]["other_keys"],
  120. )
  121. new_config = {}
  122. for key in secret_keys:
  123. if key in decrypt_tracing_config:
  124. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  125. for key in other_keys:
  126. new_config[key] = decrypt_tracing_config.get(key, "")
  127. return config_class(**new_config).model_dump()
  128. @classmethod
  129. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  130. """
  131. Get decrypted tracing config
  132. :param app_id: app id
  133. :param tracing_provider: tracing provider
  134. :return:
  135. """
  136. trace_config_data: Optional[TraceAppConfig] = (
  137. db.session.query(TraceAppConfig)
  138. .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
  139. .first()
  140. )
  141. if not trace_config_data:
  142. return None
  143. # decrypt_token
  144. app = db.session.query(App).filter(App.id == app_id).first()
  145. if not app:
  146. raise ValueError("App not found")
  147. tenant_id = app.tenant_id
  148. decrypt_tracing_config = cls.decrypt_tracing_config(
  149. tenant_id, tracing_provider, trace_config_data.tracing_config
  150. )
  151. return decrypt_tracing_config
  152. @classmethod
  153. def get_ops_trace_instance(
  154. cls,
  155. app_id: Optional[Union[UUID, str]] = None,
  156. ):
  157. """
  158. Get ops trace through model config
  159. :param app_id: app_id
  160. :return:
  161. """
  162. if isinstance(app_id, UUID):
  163. app_id = str(app_id)
  164. if app_id is None:
  165. return None
  166. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  167. if app is None:
  168. return None
  169. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  170. if app_ops_trace_config is None:
  171. return None
  172. tracing_provider = app_ops_trace_config.get("tracing_provider")
  173. if tracing_provider is None or tracing_provider not in provider_config_map:
  174. return None
  175. # decrypt_token
  176. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  177. if app_ops_trace_config.get("enabled"):
  178. trace_instance, config_class = (
  179. provider_config_map[tracing_provider]["trace_instance"],
  180. provider_config_map[tracing_provider]["config_class"],
  181. )
  182. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  183. return tracing_instance
  184. return None
  185. @classmethod
  186. def get_app_config_through_message_id(cls, message_id: str):
  187. app_model_config = None
  188. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  189. if not message_data:
  190. return None
  191. conversation_id = message_data.conversation_id
  192. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  193. if not conversation_data:
  194. return None
  195. if conversation_data.app_model_config_id:
  196. app_model_config = (
  197. db.session.query(AppModelConfig)
  198. .filter(AppModelConfig.id == conversation_data.app_model_config_id)
  199. .first()
  200. )
  201. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  202. app_model_config = conversation_data.override_model_configs
  203. return app_model_config
  204. @classmethod
  205. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  206. """
  207. Update app tracing config
  208. :param app_id: app id
  209. :param enabled: enabled
  210. :param tracing_provider: tracing provider
  211. :return:
  212. """
  213. # auth check
  214. if tracing_provider not in provider_config_map and tracing_provider is not None:
  215. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  216. app_config: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  217. if not app_config:
  218. raise ValueError("App not found")
  219. app_config.tracing = json.dumps(
  220. {
  221. "enabled": enabled,
  222. "tracing_provider": tracing_provider,
  223. }
  224. )
  225. db.session.commit()
  226. @classmethod
  227. def get_app_tracing_config(cls, app_id: str):
  228. """
  229. Get app tracing config
  230. :param app_id: app id
  231. :return:
  232. """
  233. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  234. if not app:
  235. raise ValueError("App not found")
  236. if not app.tracing:
  237. return {"enabled": False, "tracing_provider": None}
  238. app_trace_config = json.loads(app.tracing)
  239. return app_trace_config
  240. @staticmethod
  241. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  242. """
  243. Check trace config is effective
  244. :param tracing_config: tracing config
  245. :param tracing_provider: tracing provider
  246. :return:
  247. """
  248. config_type, trace_instance = (
  249. provider_config_map[tracing_provider]["config_class"],
  250. provider_config_map[tracing_provider]["trace_instance"],
  251. )
  252. tracing_config = config_type(**tracing_config)
  253. return trace_instance(tracing_config).api_check()
  254. @staticmethod
  255. def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
  256. """
  257. get trace config is project key
  258. :param tracing_config: tracing config
  259. :param tracing_provider: tracing provider
  260. :return:
  261. """
  262. config_type, trace_instance = (
  263. provider_config_map[tracing_provider]["config_class"],
  264. provider_config_map[tracing_provider]["trace_instance"],
  265. )
  266. tracing_config = config_type(**tracing_config)
  267. return trace_instance(tracing_config).get_project_key()
  268. @staticmethod
  269. def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
  270. """
  271. get trace config is project key
  272. :param tracing_config: tracing config
  273. :param tracing_provider: tracing provider
  274. :return:
  275. """
  276. config_type, trace_instance = (
  277. provider_config_map[tracing_provider]["config_class"],
  278. provider_config_map[tracing_provider]["trace_instance"],
  279. )
  280. tracing_config = config_type(**tracing_config)
  281. return trace_instance(tracing_config).get_project_url()
  282. class TraceTask:
  283. def __init__(
  284. self,
  285. trace_type: Any,
  286. message_id: Optional[str] = None,
  287. workflow_run: Optional[WorkflowRun] = None,
  288. conversation_id: Optional[str] = None,
  289. user_id: Optional[str] = None,
  290. timer: Optional[Any] = None,
  291. **kwargs,
  292. ):
  293. self.trace_type = trace_type
  294. self.message_id = message_id
  295. self.workflow_run_id = workflow_run.id if workflow_run else None
  296. self.conversation_id = conversation_id
  297. self.user_id = user_id
  298. self.timer = timer
  299. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  300. self.app_id = None
  301. self.kwargs = kwargs
  302. def execute(self):
  303. return self.preprocess()
  304. def preprocess(self):
  305. preprocess_map = {
  306. TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
  307. TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
  308. workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id
  309. ),
  310. TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id),
  311. TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
  312. message_id=self.message_id, timer=self.timer, **self.kwargs
  313. ),
  314. TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
  315. message_id=self.message_id, timer=self.timer, **self.kwargs
  316. ),
  317. TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
  318. message_id=self.message_id, timer=self.timer, **self.kwargs
  319. ),
  320. TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(
  321. message_id=self.message_id, timer=self.timer, **self.kwargs
  322. ),
  323. TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
  324. conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
  325. ),
  326. }
  327. return preprocess_map.get(self.trace_type, lambda: None)()
  328. # process methods for different trace types
  329. def conversation_trace(self, **kwargs):
  330. return kwargs
  331. def workflow_trace(
  332. self,
  333. *,
  334. workflow_run_id: str | None,
  335. conversation_id: str | None,
  336. user_id: str | None,
  337. ):
  338. if not workflow_run_id:
  339. return {}
  340. with Session(db.engine) as session:
  341. workflow_run_stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)
  342. workflow_run = session.scalars(workflow_run_stmt).first()
  343. if not workflow_run:
  344. raise ValueError("Workflow run not found")
  345. workflow_id = workflow_run.workflow_id
  346. tenant_id = workflow_run.tenant_id
  347. workflow_run_id = workflow_run.id
  348. workflow_run_elapsed_time = workflow_run.elapsed_time
  349. workflow_run_status = workflow_run.status
  350. workflow_run_inputs = workflow_run.inputs_dict
  351. workflow_run_outputs = workflow_run.outputs_dict
  352. workflow_run_version = workflow_run.version
  353. error = workflow_run.error or ""
  354. total_tokens = workflow_run.total_tokens
  355. file_list = workflow_run_inputs.get("sys.file") or []
  356. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  357. # get workflow_app_log_id
  358. workflow_app_log_data_stmt = select(WorkflowAppLog.id).where(
  359. WorkflowAppLog.tenant_id == tenant_id,
  360. WorkflowAppLog.app_id == workflow_run.app_id,
  361. WorkflowAppLog.workflow_run_id == workflow_run.id,
  362. )
  363. workflow_app_log_id = session.scalar(workflow_app_log_data_stmt)
  364. # get message_id
  365. message_id = None
  366. if conversation_id:
  367. message_data_stmt = select(Message.id).where(
  368. Message.conversation_id == conversation_id,
  369. Message.workflow_run_id == workflow_run_id,
  370. )
  371. message_id = session.scalar(message_data_stmt)
  372. metadata = {
  373. "workflow_id": workflow_id,
  374. "conversation_id": conversation_id,
  375. "workflow_run_id": workflow_run_id,
  376. "tenant_id": tenant_id,
  377. "elapsed_time": workflow_run_elapsed_time,
  378. "status": workflow_run_status,
  379. "version": workflow_run_version,
  380. "total_tokens": total_tokens,
  381. "file_list": file_list,
  382. "triggered_form": workflow_run.triggered_from,
  383. "user_id": user_id,
  384. }
  385. workflow_trace_info = WorkflowTraceInfo(
  386. workflow_data=workflow_run.to_dict(),
  387. conversation_id=conversation_id,
  388. workflow_id=workflow_id,
  389. tenant_id=tenant_id,
  390. workflow_run_id=workflow_run_id,
  391. workflow_run_elapsed_time=workflow_run_elapsed_time,
  392. workflow_run_status=workflow_run_status,
  393. workflow_run_inputs=workflow_run_inputs,
  394. workflow_run_outputs=workflow_run_outputs,
  395. workflow_run_version=workflow_run_version,
  396. error=error,
  397. total_tokens=total_tokens,
  398. file_list=file_list,
  399. query=query,
  400. metadata=metadata,
  401. workflow_app_log_id=workflow_app_log_id,
  402. message_id=message_id,
  403. start_time=workflow_run.created_at,
  404. end_time=workflow_run.finished_at,
  405. )
  406. return workflow_trace_info
  407. def message_trace(self, message_id: str | None):
  408. if not message_id:
  409. return {}
  410. message_data = get_message_data(message_id)
  411. if not message_data:
  412. return {}
  413. conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id)
  414. conversation_mode = db.session.scalars(conversation_mode_stmt).all()
  415. if not conversation_mode or len(conversation_mode) == 0:
  416. return {}
  417. conversation_mode = conversation_mode[0]
  418. created_at = message_data.created_at
  419. inputs = message_data.message
  420. # get message file data
  421. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  422. file_list = []
  423. if message_file_data and message_file_data.url is not None:
  424. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  425. file_list.append(file_url)
  426. metadata = {
  427. "conversation_id": message_data.conversation_id,
  428. "ls_provider": message_data.model_provider,
  429. "ls_model_name": message_data.model_id,
  430. "status": message_data.status,
  431. "from_end_user_id": message_data.from_end_user_id,
  432. "from_account_id": message_data.from_account_id,
  433. "agent_based": message_data.agent_based,
  434. "workflow_run_id": message_data.workflow_run_id,
  435. "from_source": message_data.from_source,
  436. "message_id": message_id,
  437. }
  438. message_tokens = message_data.message_tokens
  439. message_trace_info = MessageTraceInfo(
  440. message_id=message_id,
  441. message_data=message_data.to_dict(),
  442. conversation_model=conversation_mode,
  443. message_tokens=message_tokens,
  444. answer_tokens=message_data.answer_tokens,
  445. total_tokens=message_tokens + message_data.answer_tokens,
  446. error=message_data.error or "",
  447. inputs=inputs,
  448. outputs=message_data.answer,
  449. file_list=file_list,
  450. start_time=created_at,
  451. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  452. metadata=metadata,
  453. message_file_data=message_file_data,
  454. conversation_mode=conversation_mode,
  455. )
  456. return message_trace_info
  457. def moderation_trace(self, message_id, timer, **kwargs):
  458. moderation_result = kwargs.get("moderation_result")
  459. if not moderation_result:
  460. return {}
  461. inputs = kwargs.get("inputs")
  462. message_data = get_message_data(message_id)
  463. if not message_data:
  464. return {}
  465. metadata = {
  466. "message_id": message_id,
  467. "action": moderation_result.action,
  468. "preset_response": moderation_result.preset_response,
  469. "query": moderation_result.query,
  470. }
  471. # get workflow_app_log_id
  472. workflow_app_log_id = None
  473. if message_data.workflow_run_id:
  474. workflow_app_log_data = (
  475. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  476. )
  477. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  478. moderation_trace_info = ModerationTraceInfo(
  479. message_id=workflow_app_log_id or message_id,
  480. inputs=inputs,
  481. message_data=message_data.to_dict(),
  482. flagged=moderation_result.flagged,
  483. action=moderation_result.action,
  484. preset_response=moderation_result.preset_response,
  485. query=moderation_result.query,
  486. start_time=timer.get("start"),
  487. end_time=timer.get("end"),
  488. metadata=metadata,
  489. )
  490. return moderation_trace_info
  491. def suggested_question_trace(self, message_id, timer, **kwargs):
  492. suggested_question = kwargs.get("suggested_question", [])
  493. message_data = get_message_data(message_id)
  494. if not message_data:
  495. return {}
  496. metadata = {
  497. "message_id": message_id,
  498. "ls_provider": message_data.model_provider,
  499. "ls_model_name": message_data.model_id,
  500. "status": message_data.status,
  501. "from_end_user_id": message_data.from_end_user_id,
  502. "from_account_id": message_data.from_account_id,
  503. "agent_based": message_data.agent_based,
  504. "workflow_run_id": message_data.workflow_run_id,
  505. "from_source": message_data.from_source,
  506. }
  507. # get workflow_app_log_id
  508. workflow_app_log_id = None
  509. if message_data.workflow_run_id:
  510. workflow_app_log_data = (
  511. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  512. )
  513. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  514. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  515. message_id=workflow_app_log_id or message_id,
  516. message_data=message_data.to_dict(),
  517. inputs=message_data.message,
  518. outputs=message_data.answer,
  519. start_time=timer.get("start"),
  520. end_time=timer.get("end"),
  521. metadata=metadata,
  522. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  523. status=message_data.status,
  524. error=message_data.error,
  525. from_account_id=message_data.from_account_id,
  526. agent_based=message_data.agent_based,
  527. from_source=message_data.from_source,
  528. model_provider=message_data.model_provider,
  529. model_id=message_data.model_id,
  530. suggested_question=suggested_question,
  531. level=message_data.status,
  532. status_message=message_data.error,
  533. )
  534. return suggested_question_trace_info
  535. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  536. documents = kwargs.get("documents")
  537. message_data = get_message_data(message_id)
  538. if not message_data:
  539. return {}
  540. metadata = {
  541. "message_id": message_id,
  542. "ls_provider": message_data.model_provider,
  543. "ls_model_name": message_data.model_id,
  544. "status": message_data.status,
  545. "from_end_user_id": message_data.from_end_user_id,
  546. "from_account_id": message_data.from_account_id,
  547. "agent_based": message_data.agent_based,
  548. "workflow_run_id": message_data.workflow_run_id,
  549. "from_source": message_data.from_source,
  550. }
  551. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  552. message_id=message_id,
  553. inputs=message_data.query or message_data.inputs,
  554. documents=[doc.model_dump() for doc in documents] if documents else [],
  555. start_time=timer.get("start"),
  556. end_time=timer.get("end"),
  557. metadata=metadata,
  558. message_data=message_data.to_dict(),
  559. )
  560. return dataset_retrieval_trace_info
  561. def tool_trace(self, message_id, timer, **kwargs):
  562. tool_name = kwargs.get("tool_name", "")
  563. tool_inputs = kwargs.get("tool_inputs", {})
  564. tool_outputs = kwargs.get("tool_outputs", {})
  565. message_data = get_message_data(message_id)
  566. if not message_data:
  567. return {}
  568. tool_config = {}
  569. time_cost = 0
  570. error = None
  571. tool_parameters = {}
  572. created_time = message_data.created_at
  573. end_time = message_data.updated_at
  574. agent_thoughts = message_data.agent_thoughts
  575. for agent_thought in agent_thoughts:
  576. if tool_name in agent_thought.tools:
  577. created_time = agent_thought.created_at
  578. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  579. tool_config = tool_meta_data.get("tool_config", {})
  580. time_cost = tool_meta_data.get("time_cost", 0)
  581. end_time = created_time + timedelta(seconds=time_cost)
  582. error = tool_meta_data.get("error", "")
  583. tool_parameters = tool_meta_data.get("tool_parameters", {})
  584. metadata = {
  585. "message_id": message_id,
  586. "tool_name": tool_name,
  587. "tool_inputs": tool_inputs,
  588. "tool_outputs": tool_outputs,
  589. "tool_config": tool_config,
  590. "time_cost": time_cost,
  591. "error": error,
  592. "tool_parameters": tool_parameters,
  593. }
  594. file_url = ""
  595. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  596. if message_file_data:
  597. message_file_id = message_file_data.id if message_file_data else None
  598. type = message_file_data.type
  599. created_by_role = message_file_data.created_by_role
  600. created_user_id = message_file_data.created_by
  601. file_url = f"{self.file_base_url}/{message_file_data.url}"
  602. metadata.update(
  603. {
  604. "message_file_id": message_file_id,
  605. "created_by_role": created_by_role,
  606. "created_user_id": created_user_id,
  607. "type": type,
  608. }
  609. )
  610. tool_trace_info = ToolTraceInfo(
  611. message_id=message_id,
  612. message_data=message_data.to_dict(),
  613. tool_name=tool_name,
  614. start_time=timer.get("start") if timer else created_time,
  615. end_time=timer.get("end") if timer else end_time,
  616. tool_inputs=tool_inputs,
  617. tool_outputs=tool_outputs,
  618. metadata=metadata,
  619. message_file_data=message_file_data,
  620. error=error,
  621. inputs=message_data.message,
  622. outputs=message_data.answer,
  623. tool_config=tool_config,
  624. time_cost=time_cost,
  625. tool_parameters=tool_parameters,
  626. file_url=file_url,
  627. )
  628. return tool_trace_info
  629. def generate_name_trace(self, conversation_id, timer, **kwargs):
  630. generate_conversation_name = kwargs.get("generate_conversation_name")
  631. inputs = kwargs.get("inputs")
  632. tenant_id = kwargs.get("tenant_id")
  633. if not tenant_id:
  634. return {}
  635. start_time = timer.get("start")
  636. end_time = timer.get("end")
  637. metadata = {
  638. "conversation_id": conversation_id,
  639. "tenant_id": tenant_id,
  640. }
  641. generate_name_trace_info = GenerateNameTraceInfo(
  642. conversation_id=conversation_id,
  643. inputs=inputs,
  644. outputs=generate_conversation_name,
  645. start_time=start_time,
  646. end_time=end_time,
  647. metadata=metadata,
  648. tenant_id=tenant_id,
  649. )
  650. return generate_name_trace_info
  651. trace_manager_timer: Optional[threading.Timer] = None
  652. trace_manager_queue: queue.Queue = queue.Queue()
  653. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
  654. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  655. class TraceQueueManager:
  656. def __init__(self, app_id=None, user_id=None):
  657. global trace_manager_timer
  658. self.app_id = app_id
  659. self.user_id = user_id
  660. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
  661. self.flask_app = current_app._get_current_object() # type: ignore
  662. if trace_manager_timer is None:
  663. self.start_timer()
  664. def add_trace_task(self, trace_task: TraceTask):
  665. global trace_manager_timer, trace_manager_queue
  666. try:
  667. if self.trace_instance:
  668. trace_task.app_id = self.app_id
  669. trace_manager_queue.put(trace_task)
  670. except Exception as e:
  671. logging.exception(f"Error adding trace task, trace_type {trace_task.trace_type}")
  672. finally:
  673. self.start_timer()
  674. def collect_tasks(self):
  675. global trace_manager_queue
  676. tasks: list[TraceTask] = []
  677. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  678. task = trace_manager_queue.get_nowait()
  679. tasks.append(task)
  680. trace_manager_queue.task_done()
  681. return tasks
  682. def run(self):
  683. try:
  684. tasks = self.collect_tasks()
  685. if tasks:
  686. self.send_to_celery(tasks)
  687. except Exception as e:
  688. logging.exception("Error processing trace tasks")
  689. def start_timer(self):
  690. global trace_manager_timer
  691. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  692. trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  693. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  694. trace_manager_timer.daemon = False
  695. trace_manager_timer.start()
  696. def send_to_celery(self, tasks: list[TraceTask]):
  697. with self.flask_app.app_context():
  698. for task in tasks:
  699. if task.app_id is None:
  700. continue
  701. file_id = uuid4().hex
  702. trace_info = task.execute()
  703. task_data = TaskData(
  704. app_id=task.app_id,
  705. trace_info_type=type(trace_info).__name__,
  706. trace_info=trace_info.model_dump() if trace_info else None,
  707. )
  708. file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
  709. storage.save(file_path, task_data.model_dump_json().encode("utf-8"))
  710. file_info = {
  711. "file_id": file_id,
  712. "app_id": task.app_id,
  713. }
  714. process_trace_tasks.delay(file_info)