ops_trace_manager.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from typing import Any, Optional, Union
  9. from uuid import UUID, uuid4
  10. from flask import current_app
  11. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  12. from core.ops.entities.config_entity import (
  13. OPS_FILE_PATH,
  14. LangfuseConfig,
  15. LangSmithConfig,
  16. TracingProviderEnum,
  17. )
  18. from core.ops.entities.trace_entity import (
  19. DatasetRetrievalTraceInfo,
  20. GenerateNameTraceInfo,
  21. MessageTraceInfo,
  22. ModerationTraceInfo,
  23. SuggestedQuestionTraceInfo,
  24. TaskData,
  25. ToolTraceInfo,
  26. TraceTaskName,
  27. WorkflowTraceInfo,
  28. )
  29. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  30. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  31. from core.ops.utils import get_message_data
  32. from extensions.ext_database import db
  33. from extensions.ext_storage import storage
  34. from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
  35. from models.workflow import WorkflowAppLog, WorkflowRun
  36. from tasks.ops_trace_task import process_trace_tasks
  37. provider_config_map: dict[str, dict[str, Any]] = {
  38. TracingProviderEnum.LANGFUSE.value: {
  39. "config_class": LangfuseConfig,
  40. "secret_keys": ["public_key", "secret_key"],
  41. "other_keys": ["host", "project_key"],
  42. "trace_instance": LangFuseDataTrace,
  43. },
  44. TracingProviderEnum.LANGSMITH.value: {
  45. "config_class": LangSmithConfig,
  46. "secret_keys": ["api_key"],
  47. "other_keys": ["project", "endpoint"],
  48. "trace_instance": LangSmithDataTrace,
  49. },
  50. }
  51. class OpsTraceManager:
  52. @classmethod
  53. def encrypt_tracing_config(
  54. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  55. ):
  56. """
  57. Encrypt tracing config.
  58. :param tenant_id: tenant id
  59. :param tracing_provider: tracing provider
  60. :param tracing_config: tracing config dictionary to be encrypted
  61. :param current_trace_config: current tracing configuration for keeping existing values
  62. :return: encrypted tracing configuration
  63. """
  64. # Get the configuration class and the keys that require encryption
  65. config_class, secret_keys, other_keys = (
  66. provider_config_map[tracing_provider]["config_class"],
  67. provider_config_map[tracing_provider]["secret_keys"],
  68. provider_config_map[tracing_provider]["other_keys"],
  69. )
  70. new_config = {}
  71. # Encrypt necessary keys
  72. for key in secret_keys:
  73. if key in tracing_config:
  74. if "*" in tracing_config[key]:
  75. # If the key contains '*', retain the original value from the current config
  76. new_config[key] = current_trace_config.get(key, tracing_config[key])
  77. else:
  78. # Otherwise, encrypt the key
  79. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  80. for key in other_keys:
  81. new_config[key] = tracing_config.get(key, "")
  82. # Create a new instance of the config class with the new configuration
  83. encrypted_config = config_class(**new_config)
  84. return encrypted_config.model_dump()
  85. @classmethod
  86. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  87. """
  88. Decrypt tracing config
  89. :param tenant_id: tenant id
  90. :param tracing_provider: tracing provider
  91. :param tracing_config: tracing config
  92. :return:
  93. """
  94. config_class, secret_keys, other_keys = (
  95. provider_config_map[tracing_provider]["config_class"],
  96. provider_config_map[tracing_provider]["secret_keys"],
  97. provider_config_map[tracing_provider]["other_keys"],
  98. )
  99. new_config = {}
  100. for key in secret_keys:
  101. if key in tracing_config:
  102. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  103. for key in other_keys:
  104. new_config[key] = tracing_config.get(key, "")
  105. return config_class(**new_config).model_dump()
  106. @classmethod
  107. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  108. """
  109. Decrypt tracing config
  110. :param tracing_provider: tracing provider
  111. :param decrypt_tracing_config: tracing config
  112. :return:
  113. """
  114. config_class, secret_keys, other_keys = (
  115. provider_config_map[tracing_provider]["config_class"],
  116. provider_config_map[tracing_provider]["secret_keys"],
  117. provider_config_map[tracing_provider]["other_keys"],
  118. )
  119. new_config = {}
  120. for key in secret_keys:
  121. if key in decrypt_tracing_config:
  122. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  123. for key in other_keys:
  124. new_config[key] = decrypt_tracing_config.get(key, "")
  125. return config_class(**new_config).model_dump()
  126. @classmethod
  127. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  128. """
  129. Get decrypted tracing config
  130. :param app_id: app id
  131. :param tracing_provider: tracing provider
  132. :return:
  133. """
  134. trace_config_data: Optional[TraceAppConfig] = (
  135. db.session.query(TraceAppConfig)
  136. .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
  137. .first()
  138. )
  139. if not trace_config_data:
  140. return None
  141. # decrypt_token
  142. app = db.session.query(App).filter(App.id == app_id).first()
  143. if not app:
  144. raise ValueError("App not found")
  145. tenant_id = app.tenant_id
  146. decrypt_tracing_config = cls.decrypt_tracing_config(
  147. tenant_id, tracing_provider, trace_config_data.tracing_config
  148. )
  149. return decrypt_tracing_config
  150. @classmethod
  151. def get_ops_trace_instance(
  152. cls,
  153. app_id: Optional[Union[UUID, str]] = None,
  154. ):
  155. """
  156. Get ops trace through model config
  157. :param app_id: app_id
  158. :return:
  159. """
  160. if isinstance(app_id, UUID):
  161. app_id = str(app_id)
  162. if app_id is None:
  163. return None
  164. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  165. if app is None:
  166. return None
  167. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  168. if app_ops_trace_config is None:
  169. return None
  170. tracing_provider = app_ops_trace_config.get("tracing_provider")
  171. if tracing_provider is None or tracing_provider not in provider_config_map:
  172. return None
  173. # decrypt_token
  174. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  175. if app_ops_trace_config.get("enabled"):
  176. trace_instance, config_class = (
  177. provider_config_map[tracing_provider]["trace_instance"],
  178. provider_config_map[tracing_provider]["config_class"],
  179. )
  180. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  181. return tracing_instance
  182. return None
  183. @classmethod
  184. def get_app_config_through_message_id(cls, message_id: str):
  185. app_model_config = None
  186. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  187. if not message_data:
  188. return None
  189. conversation_id = message_data.conversation_id
  190. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  191. if not conversation_data:
  192. return None
  193. if conversation_data.app_model_config_id:
  194. app_model_config = (
  195. db.session.query(AppModelConfig)
  196. .filter(AppModelConfig.id == conversation_data.app_model_config_id)
  197. .first()
  198. )
  199. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  200. app_model_config = conversation_data.override_model_configs
  201. return app_model_config
  202. @classmethod
  203. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  204. """
  205. Update app tracing config
  206. :param app_id: app id
  207. :param enabled: enabled
  208. :param tracing_provider: tracing provider
  209. :return:
  210. """
  211. # auth check
  212. if tracing_provider not in provider_config_map and tracing_provider is not None:
  213. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  214. app_config: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  215. if not app_config:
  216. raise ValueError("App not found")
  217. app_config.tracing = json.dumps(
  218. {
  219. "enabled": enabled,
  220. "tracing_provider": tracing_provider,
  221. }
  222. )
  223. db.session.commit()
  224. @classmethod
  225. def get_app_tracing_config(cls, app_id: str):
  226. """
  227. Get app tracing config
  228. :param app_id: app id
  229. :return:
  230. """
  231. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  232. if not app:
  233. raise ValueError("App not found")
  234. if not app.tracing:
  235. return {"enabled": False, "tracing_provider": None}
  236. app_trace_config = json.loads(app.tracing)
  237. return app_trace_config
  238. @staticmethod
  239. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  240. """
  241. Check trace config is effective
  242. :param tracing_config: tracing config
  243. :param tracing_provider: tracing provider
  244. :return:
  245. """
  246. config_type, trace_instance = (
  247. provider_config_map[tracing_provider]["config_class"],
  248. provider_config_map[tracing_provider]["trace_instance"],
  249. )
  250. tracing_config = config_type(**tracing_config)
  251. return trace_instance(tracing_config).api_check()
  252. @staticmethod
  253. def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
  254. """
  255. get trace config is project key
  256. :param tracing_config: tracing config
  257. :param tracing_provider: tracing provider
  258. :return:
  259. """
  260. config_type, trace_instance = (
  261. provider_config_map[tracing_provider]["config_class"],
  262. provider_config_map[tracing_provider]["trace_instance"],
  263. )
  264. tracing_config = config_type(**tracing_config)
  265. return trace_instance(tracing_config).get_project_key()
  266. @staticmethod
  267. def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
  268. """
  269. get trace config is project key
  270. :param tracing_config: tracing config
  271. :param tracing_provider: tracing provider
  272. :return:
  273. """
  274. config_type, trace_instance = (
  275. provider_config_map[tracing_provider]["config_class"],
  276. provider_config_map[tracing_provider]["trace_instance"],
  277. )
  278. tracing_config = config_type(**tracing_config)
  279. return trace_instance(tracing_config).get_project_url()
  280. class TraceTask:
  281. def __init__(
  282. self,
  283. trace_type: Any,
  284. message_id: Optional[str] = None,
  285. workflow_run: Optional[WorkflowRun] = None,
  286. conversation_id: Optional[str] = None,
  287. user_id: Optional[str] = None,
  288. timer: Optional[Any] = None,
  289. **kwargs,
  290. ):
  291. self.trace_type = trace_type
  292. self.message_id = message_id
  293. self.workflow_run = workflow_run
  294. self.conversation_id = conversation_id
  295. self.user_id = user_id
  296. self.timer = timer
  297. self.kwargs = kwargs
  298. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  299. self.app_id = None
  300. def execute(self):
  301. return self.preprocess()
  302. def preprocess(self):
  303. preprocess_map = {
  304. TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
  305. TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
  306. self.workflow_run, self.conversation_id, self.user_id
  307. ),
  308. TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(self.message_id),
  309. TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(self.message_id, self.timer, **self.kwargs),
  310. TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
  311. self.message_id, self.timer, **self.kwargs
  312. ),
  313. TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
  314. self.message_id, self.timer, **self.kwargs
  315. ),
  316. TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(self.message_id, self.timer, **self.kwargs),
  317. TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
  318. self.conversation_id, self.timer, **self.kwargs
  319. ),
  320. }
  321. return preprocess_map.get(self.trace_type, lambda: None)()
  322. # process methods for different trace types
  323. def conversation_trace(self, **kwargs):
  324. return kwargs
  325. def workflow_trace(self, workflow_run: WorkflowRun | None, conversation_id, user_id):
  326. if not workflow_run:
  327. raise ValueError("Workflow run not found")
  328. db.session.merge(workflow_run)
  329. db.session.refresh(workflow_run)
  330. workflow_id = workflow_run.workflow_id
  331. tenant_id = workflow_run.tenant_id
  332. workflow_run_id = workflow_run.id
  333. workflow_run_elapsed_time = workflow_run.elapsed_time
  334. workflow_run_status = workflow_run.status
  335. workflow_run_inputs = workflow_run.inputs_dict
  336. workflow_run_outputs = workflow_run.outputs_dict
  337. workflow_run_version = workflow_run.version
  338. error = workflow_run.error or ""
  339. total_tokens = workflow_run.total_tokens
  340. file_list = workflow_run_inputs.get("sys.file") or []
  341. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  342. # get workflow_app_log_id
  343. workflow_app_log_data = (
  344. db.session.query(WorkflowAppLog)
  345. .filter_by(tenant_id=tenant_id, app_id=workflow_run.app_id, workflow_run_id=workflow_run.id)
  346. .first()
  347. )
  348. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  349. # get message_id
  350. message_data = (
  351. db.session.query(Message.id)
  352. .filter_by(conversation_id=conversation_id, workflow_run_id=workflow_run_id)
  353. .first()
  354. )
  355. message_id = str(message_data.id) if message_data else None
  356. metadata = {
  357. "workflow_id": workflow_id,
  358. "conversation_id": conversation_id,
  359. "workflow_run_id": workflow_run_id,
  360. "tenant_id": tenant_id,
  361. "elapsed_time": workflow_run_elapsed_time,
  362. "status": workflow_run_status,
  363. "version": workflow_run_version,
  364. "total_tokens": total_tokens,
  365. "file_list": file_list,
  366. "triggered_form": workflow_run.triggered_from,
  367. "user_id": user_id,
  368. }
  369. workflow_trace_info = WorkflowTraceInfo(
  370. workflow_data=workflow_run.to_dict(),
  371. conversation_id=conversation_id,
  372. workflow_id=workflow_id,
  373. tenant_id=tenant_id,
  374. workflow_run_id=workflow_run_id,
  375. workflow_run_elapsed_time=workflow_run_elapsed_time,
  376. workflow_run_status=workflow_run_status,
  377. workflow_run_inputs=workflow_run_inputs,
  378. workflow_run_outputs=workflow_run_outputs,
  379. workflow_run_version=workflow_run_version,
  380. error=error,
  381. total_tokens=total_tokens,
  382. file_list=file_list,
  383. query=query,
  384. metadata=metadata,
  385. workflow_app_log_id=workflow_app_log_id,
  386. message_id=message_id,
  387. start_time=workflow_run.created_at,
  388. end_time=workflow_run.finished_at,
  389. )
  390. return workflow_trace_info
  391. def message_trace(self, message_id):
  392. message_data = get_message_data(message_id)
  393. if not message_data:
  394. return {}
  395. conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
  396. conversation_mode = conversation_mode[0]
  397. created_at = message_data.created_at
  398. inputs = message_data.message
  399. # get message file data
  400. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  401. file_list = []
  402. if message_file_data and message_file_data.url is not None:
  403. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  404. file_list.append(file_url)
  405. metadata = {
  406. "conversation_id": message_data.conversation_id,
  407. "ls_provider": message_data.model_provider,
  408. "ls_model_name": message_data.model_id,
  409. "status": message_data.status,
  410. "from_end_user_id": message_data.from_end_user_id,
  411. "from_account_id": message_data.from_account_id,
  412. "agent_based": message_data.agent_based,
  413. "workflow_run_id": message_data.workflow_run_id,
  414. "from_source": message_data.from_source,
  415. "message_id": message_id,
  416. }
  417. message_tokens = message_data.message_tokens
  418. message_trace_info = MessageTraceInfo(
  419. message_id=message_id,
  420. message_data=message_data.to_dict(),
  421. conversation_model=conversation_mode,
  422. message_tokens=message_tokens,
  423. answer_tokens=message_data.answer_tokens,
  424. total_tokens=message_tokens + message_data.answer_tokens,
  425. error=message_data.error or "",
  426. inputs=inputs,
  427. outputs=message_data.answer,
  428. file_list=file_list,
  429. start_time=created_at,
  430. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  431. metadata=metadata,
  432. message_file_data=message_file_data,
  433. conversation_mode=conversation_mode,
  434. )
  435. return message_trace_info
  436. def moderation_trace(self, message_id, timer, **kwargs):
  437. moderation_result = kwargs.get("moderation_result")
  438. if not moderation_result:
  439. return {}
  440. inputs = kwargs.get("inputs")
  441. message_data = get_message_data(message_id)
  442. if not message_data:
  443. return {}
  444. metadata = {
  445. "message_id": message_id,
  446. "action": moderation_result.action,
  447. "preset_response": moderation_result.preset_response,
  448. "query": moderation_result.query,
  449. }
  450. # get workflow_app_log_id
  451. workflow_app_log_id = None
  452. if message_data.workflow_run_id:
  453. workflow_app_log_data = (
  454. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  455. )
  456. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  457. moderation_trace_info = ModerationTraceInfo(
  458. message_id=workflow_app_log_id or message_id,
  459. inputs=inputs,
  460. message_data=message_data.to_dict(),
  461. flagged=moderation_result.flagged,
  462. action=moderation_result.action,
  463. preset_response=moderation_result.preset_response,
  464. query=moderation_result.query,
  465. start_time=timer.get("start"),
  466. end_time=timer.get("end"),
  467. metadata=metadata,
  468. )
  469. return moderation_trace_info
  470. def suggested_question_trace(self, message_id, timer, **kwargs):
  471. suggested_question = kwargs.get("suggested_question", [])
  472. message_data = get_message_data(message_id)
  473. if not message_data:
  474. return {}
  475. metadata = {
  476. "message_id": message_id,
  477. "ls_provider": message_data.model_provider,
  478. "ls_model_name": message_data.model_id,
  479. "status": message_data.status,
  480. "from_end_user_id": message_data.from_end_user_id,
  481. "from_account_id": message_data.from_account_id,
  482. "agent_based": message_data.agent_based,
  483. "workflow_run_id": message_data.workflow_run_id,
  484. "from_source": message_data.from_source,
  485. }
  486. # get workflow_app_log_id
  487. workflow_app_log_id = None
  488. if message_data.workflow_run_id:
  489. workflow_app_log_data = (
  490. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  491. )
  492. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  493. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  494. message_id=workflow_app_log_id or message_id,
  495. message_data=message_data.to_dict(),
  496. inputs=message_data.message,
  497. outputs=message_data.answer,
  498. start_time=timer.get("start"),
  499. end_time=timer.get("end"),
  500. metadata=metadata,
  501. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  502. status=message_data.status,
  503. error=message_data.error,
  504. from_account_id=message_data.from_account_id,
  505. agent_based=message_data.agent_based,
  506. from_source=message_data.from_source,
  507. model_provider=message_data.model_provider,
  508. model_id=message_data.model_id,
  509. suggested_question=suggested_question,
  510. level=message_data.status,
  511. status_message=message_data.error,
  512. )
  513. return suggested_question_trace_info
  514. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  515. documents = kwargs.get("documents")
  516. message_data = get_message_data(message_id)
  517. if not message_data:
  518. return {}
  519. metadata = {
  520. "message_id": message_id,
  521. "ls_provider": message_data.model_provider,
  522. "ls_model_name": message_data.model_id,
  523. "status": message_data.status,
  524. "from_end_user_id": message_data.from_end_user_id,
  525. "from_account_id": message_data.from_account_id,
  526. "agent_based": message_data.agent_based,
  527. "workflow_run_id": message_data.workflow_run_id,
  528. "from_source": message_data.from_source,
  529. }
  530. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  531. message_id=message_id,
  532. inputs=message_data.query or message_data.inputs,
  533. documents=[doc.model_dump() for doc in documents] if documents else [],
  534. start_time=timer.get("start"),
  535. end_time=timer.get("end"),
  536. metadata=metadata,
  537. message_data=message_data.to_dict(),
  538. )
  539. return dataset_retrieval_trace_info
  540. def tool_trace(self, message_id, timer, **kwargs):
  541. tool_name = kwargs.get("tool_name", "")
  542. tool_inputs = kwargs.get("tool_inputs", {})
  543. tool_outputs = kwargs.get("tool_outputs", {})
  544. message_data = get_message_data(message_id)
  545. if not message_data:
  546. return {}
  547. tool_config = {}
  548. time_cost = 0
  549. error = None
  550. tool_parameters = {}
  551. created_time = message_data.created_at
  552. end_time = message_data.updated_at
  553. agent_thoughts = message_data.agent_thoughts
  554. for agent_thought in agent_thoughts:
  555. if tool_name in agent_thought.tools:
  556. created_time = agent_thought.created_at
  557. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  558. tool_config = tool_meta_data.get("tool_config", {})
  559. time_cost = tool_meta_data.get("time_cost", 0)
  560. end_time = created_time + timedelta(seconds=time_cost)
  561. error = tool_meta_data.get("error", "")
  562. tool_parameters = tool_meta_data.get("tool_parameters", {})
  563. metadata = {
  564. "message_id": message_id,
  565. "tool_name": tool_name,
  566. "tool_inputs": tool_inputs,
  567. "tool_outputs": tool_outputs,
  568. "tool_config": tool_config,
  569. "time_cost": time_cost,
  570. "error": error,
  571. "tool_parameters": tool_parameters,
  572. }
  573. file_url = ""
  574. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  575. if message_file_data:
  576. message_file_id = message_file_data.id if message_file_data else None
  577. type = message_file_data.type
  578. created_by_role = message_file_data.created_by_role
  579. created_user_id = message_file_data.created_by
  580. file_url = f"{self.file_base_url}/{message_file_data.url}"
  581. metadata.update(
  582. {
  583. "message_file_id": message_file_id,
  584. "created_by_role": created_by_role,
  585. "created_user_id": created_user_id,
  586. "type": type,
  587. }
  588. )
  589. tool_trace_info = ToolTraceInfo(
  590. message_id=message_id,
  591. message_data=message_data.to_dict(),
  592. tool_name=tool_name,
  593. start_time=timer.get("start") if timer else created_time,
  594. end_time=timer.get("end") if timer else end_time,
  595. tool_inputs=tool_inputs,
  596. tool_outputs=tool_outputs,
  597. metadata=metadata,
  598. message_file_data=message_file_data,
  599. error=error,
  600. inputs=message_data.message,
  601. outputs=message_data.answer,
  602. tool_config=tool_config,
  603. time_cost=time_cost,
  604. tool_parameters=tool_parameters,
  605. file_url=file_url,
  606. )
  607. return tool_trace_info
  608. def generate_name_trace(self, conversation_id, timer, **kwargs):
  609. generate_conversation_name = kwargs.get("generate_conversation_name")
  610. inputs = kwargs.get("inputs")
  611. tenant_id = kwargs.get("tenant_id")
  612. if not tenant_id:
  613. return {}
  614. start_time = timer.get("start")
  615. end_time = timer.get("end")
  616. metadata = {
  617. "conversation_id": conversation_id,
  618. "tenant_id": tenant_id,
  619. }
  620. generate_name_trace_info = GenerateNameTraceInfo(
  621. conversation_id=conversation_id,
  622. inputs=inputs,
  623. outputs=generate_conversation_name,
  624. start_time=start_time,
  625. end_time=end_time,
  626. metadata=metadata,
  627. tenant_id=tenant_id,
  628. )
  629. return generate_name_trace_info
  630. trace_manager_timer: Optional[threading.Timer] = None
  631. trace_manager_queue: queue.Queue = queue.Queue()
  632. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
  633. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  634. class TraceQueueManager:
  635. def __init__(self, app_id=None, user_id=None):
  636. global trace_manager_timer
  637. self.app_id = app_id
  638. self.user_id = user_id
  639. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
  640. self.flask_app = current_app._get_current_object() # type: ignore
  641. if trace_manager_timer is None:
  642. self.start_timer()
  643. def add_trace_task(self, trace_task: TraceTask):
  644. global trace_manager_timer, trace_manager_queue
  645. try:
  646. if self.trace_instance:
  647. trace_task.app_id = self.app_id
  648. trace_manager_queue.put(trace_task)
  649. except Exception as e:
  650. logging.exception(f"Error adding trace task, trace_type {trace_task.trace_type}")
  651. finally:
  652. self.start_timer()
  653. def collect_tasks(self):
  654. global trace_manager_queue
  655. tasks: list[TraceTask] = []
  656. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  657. task = trace_manager_queue.get_nowait()
  658. tasks.append(task)
  659. trace_manager_queue.task_done()
  660. return tasks
  661. def run(self):
  662. try:
  663. tasks = self.collect_tasks()
  664. if tasks:
  665. self.send_to_celery(tasks)
  666. except Exception as e:
  667. logging.exception("Error processing trace tasks")
  668. def start_timer(self):
  669. global trace_manager_timer
  670. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  671. trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  672. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  673. trace_manager_timer.daemon = False
  674. trace_manager_timer.start()
  675. def send_to_celery(self, tasks: list[TraceTask]):
  676. with self.flask_app.app_context():
  677. for task in tasks:
  678. if task.app_id is None:
  679. continue
  680. file_id = uuid4().hex
  681. trace_info = task.execute()
  682. task_data = TaskData(
  683. app_id=task.app_id,
  684. trace_info_type=type(trace_info).__name__,
  685. trace_info=trace_info.model_dump() if trace_info else None,
  686. )
  687. file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
  688. storage.save(file_path, task_data.model_dump_json().encode("utf-8"))
  689. file_info = {
  690. "file_id": file_id,
  691. "app_id": task.app_id,
  692. }
  693. process_trace_tasks.delay(file_info)