opik_trace.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. import json
  2. import logging
  3. import os
  4. import uuid
  5. from datetime import datetime, timedelta
  6. from typing import Optional, cast
  7. from opik import Opik, Trace
  8. from opik.id_helpers import uuid4_to_uuid7
  9. from core.ops.base_trace_instance import BaseTraceInstance
  10. from core.ops.entities.config_entity import OpikConfig
  11. from core.ops.entities.trace_entity import (
  12. BaseTraceInfo,
  13. DatasetRetrievalTraceInfo,
  14. GenerateNameTraceInfo,
  15. MessageTraceInfo,
  16. ModerationTraceInfo,
  17. SuggestedQuestionTraceInfo,
  18. ToolTraceInfo,
  19. TraceTaskName,
  20. WorkflowTraceInfo,
  21. )
  22. from extensions.ext_database import db
  23. from models.model import EndUser, MessageFile
  24. from models.workflow import WorkflowNodeExecution
  25. logger = logging.getLogger(__name__)
  26. def wrap_dict(key_name, data):
  27. """Make sure that the input data is a dict"""
  28. if not isinstance(data, dict):
  29. return {key_name: data}
  30. return data
  31. def wrap_metadata(metadata, **kwargs):
  32. """Add common metatada to all Traces and Spans"""
  33. metadata["created_from"] = "dify"
  34. metadata.update(kwargs)
  35. return metadata
  36. def prepare_opik_uuid(user_datetime: Optional[datetime], user_uuid: Optional[str]):
  37. """Opik needs UUIDv7 while Dify uses UUIDv4 for identifier of most
  38. messages and objects. The type-hints of BaseTraceInfo indicates that
  39. objects start_time and message_id could be null which means we cannot map
  40. it to a UUIDv7. Given that we have no way to identify that object
  41. uniquely, generate a new random one UUIDv7 in that case.
  42. """
  43. if user_datetime is None:
  44. user_datetime = datetime.now()
  45. if user_uuid is None:
  46. user_uuid = str(uuid.uuid4())
  47. return uuid4_to_uuid7(user_datetime, user_uuid)
  48. class OpikDataTrace(BaseTraceInstance):
  49. def __init__(
  50. self,
  51. opik_config: OpikConfig,
  52. ):
  53. super().__init__(opik_config)
  54. self.opik_client = Opik(
  55. project_name=opik_config.project,
  56. workspace=opik_config.workspace,
  57. host=opik_config.url,
  58. api_key=opik_config.api_key,
  59. )
  60. self.project = opik_config.project
  61. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  62. def trace(self, trace_info: BaseTraceInfo):
  63. if isinstance(trace_info, WorkflowTraceInfo):
  64. self.workflow_trace(trace_info)
  65. if isinstance(trace_info, MessageTraceInfo):
  66. self.message_trace(trace_info)
  67. if isinstance(trace_info, ModerationTraceInfo):
  68. self.moderation_trace(trace_info)
  69. if isinstance(trace_info, SuggestedQuestionTraceInfo):
  70. self.suggested_question_trace(trace_info)
  71. if isinstance(trace_info, DatasetRetrievalTraceInfo):
  72. self.dataset_retrieval_trace(trace_info)
  73. if isinstance(trace_info, ToolTraceInfo):
  74. self.tool_trace(trace_info)
  75. if isinstance(trace_info, GenerateNameTraceInfo):
  76. self.generate_name_trace(trace_info)
  77. def workflow_trace(self, trace_info: WorkflowTraceInfo):
  78. dify_trace_id = trace_info.workflow_run_id
  79. opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
  80. workflow_metadata = wrap_metadata(
  81. trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
  82. )
  83. root_span_id = None
  84. if trace_info.message_id:
  85. dify_trace_id = trace_info.message_id
  86. opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
  87. trace_data = {
  88. "id": opik_trace_id,
  89. "name": TraceTaskName.MESSAGE_TRACE.value,
  90. "start_time": trace_info.start_time,
  91. "end_time": trace_info.end_time,
  92. "metadata": workflow_metadata,
  93. "input": wrap_dict("input", trace_info.workflow_run_inputs),
  94. "output": wrap_dict("output", trace_info.workflow_run_outputs),
  95. "tags": ["message", "workflow"],
  96. "project_name": self.project,
  97. }
  98. self.add_trace(trace_data)
  99. root_span_id = prepare_opik_uuid(trace_info.start_time, trace_info.workflow_run_id)
  100. span_data = {
  101. "id": root_span_id,
  102. "parent_span_id": None,
  103. "trace_id": opik_trace_id,
  104. "name": TraceTaskName.WORKFLOW_TRACE.value,
  105. "input": wrap_dict("input", trace_info.workflow_run_inputs),
  106. "output": wrap_dict("output", trace_info.workflow_run_outputs),
  107. "start_time": trace_info.start_time,
  108. "end_time": trace_info.end_time,
  109. "metadata": workflow_metadata,
  110. "tags": ["workflow"],
  111. "project_name": self.project,
  112. }
  113. self.add_span(span_data)
  114. else:
  115. trace_data = {
  116. "id": opik_trace_id,
  117. "name": TraceTaskName.MESSAGE_TRACE.value,
  118. "start_time": trace_info.start_time,
  119. "end_time": trace_info.end_time,
  120. "metadata": workflow_metadata,
  121. "input": wrap_dict("input", trace_info.workflow_run_inputs),
  122. "output": wrap_dict("output", trace_info.workflow_run_outputs),
  123. "tags": ["workflow"],
  124. "project_name": self.project,
  125. }
  126. self.add_trace(trace_data)
  127. # through workflow_run_id get all_nodes_execution
  128. workflow_nodes_execution_id_records = (
  129. db.session.query(WorkflowNodeExecution.id)
  130. .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
  131. .all()
  132. )
  133. for node_execution_id_record in workflow_nodes_execution_id_records:
  134. node_execution = (
  135. db.session.query(
  136. WorkflowNodeExecution.id,
  137. WorkflowNodeExecution.tenant_id,
  138. WorkflowNodeExecution.app_id,
  139. WorkflowNodeExecution.title,
  140. WorkflowNodeExecution.node_type,
  141. WorkflowNodeExecution.status,
  142. WorkflowNodeExecution.inputs,
  143. WorkflowNodeExecution.outputs,
  144. WorkflowNodeExecution.created_at,
  145. WorkflowNodeExecution.elapsed_time,
  146. WorkflowNodeExecution.process_data,
  147. WorkflowNodeExecution.execution_metadata,
  148. )
  149. .filter(WorkflowNodeExecution.id == node_execution_id_record.id)
  150. .first()
  151. )
  152. if not node_execution:
  153. continue
  154. node_execution_id = node_execution.id
  155. tenant_id = node_execution.tenant_id
  156. app_id = node_execution.app_id
  157. node_name = node_execution.title
  158. node_type = node_execution.node_type
  159. status = node_execution.status
  160. if node_type == "llm":
  161. inputs = (
  162. json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
  163. )
  164. else:
  165. inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
  166. outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}
  167. created_at = node_execution.created_at or datetime.now()
  168. elapsed_time = node_execution.elapsed_time
  169. finished_at = created_at + timedelta(seconds=elapsed_time)
  170. execution_metadata = (
  171. json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
  172. )
  173. metadata = execution_metadata.copy()
  174. metadata.update(
  175. {
  176. "workflow_run_id": trace_info.workflow_run_id,
  177. "node_execution_id": node_execution_id,
  178. "tenant_id": tenant_id,
  179. "app_id": app_id,
  180. "app_name": node_name,
  181. "node_type": node_type,
  182. "status": status,
  183. }
  184. )
  185. process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
  186. provider = None
  187. model = None
  188. total_tokens = 0
  189. completion_tokens = 0
  190. prompt_tokens = 0
  191. if process_data and process_data.get("model_mode") == "chat":
  192. run_type = "llm"
  193. provider = process_data.get("model_provider", None)
  194. model = process_data.get("model_name", "")
  195. metadata.update(
  196. {
  197. "ls_provider": provider,
  198. "ls_model_name": model,
  199. }
  200. )
  201. try:
  202. if outputs.get("usage"):
  203. total_tokens = outputs["usage"].get("total_tokens", 0)
  204. prompt_tokens = outputs["usage"].get("prompt_tokens", 0)
  205. completion_tokens = outputs["usage"].get("completion_tokens", 0)
  206. except Exception:
  207. logger.error("Failed to extract usage", exc_info=True)
  208. else:
  209. run_type = "tool"
  210. parent_span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id
  211. if not total_tokens:
  212. total_tokens = execution_metadata.get("total_tokens", 0)
  213. span_data = {
  214. "trace_id": opik_trace_id,
  215. "id": prepare_opik_uuid(created_at, node_execution_id),
  216. "parent_span_id": prepare_opik_uuid(trace_info.start_time, parent_span_id),
  217. "name": node_type,
  218. "type": run_type,
  219. "start_time": created_at,
  220. "end_time": finished_at,
  221. "metadata": wrap_metadata(metadata),
  222. "input": wrap_dict("input", inputs),
  223. "output": wrap_dict("output", outputs),
  224. "tags": ["node_execution"],
  225. "project_name": self.project,
  226. "usage": {
  227. "total_tokens": total_tokens,
  228. "completion_tokens": completion_tokens,
  229. "prompt_tokens": prompt_tokens,
  230. },
  231. "model": model,
  232. "provider": provider,
  233. }
  234. self.add_span(span_data)
  235. def message_trace(self, trace_info: MessageTraceInfo):
  236. # get message file data
  237. file_list = cast(list[str], trace_info.file_list) or []
  238. message_file_data: Optional[MessageFile] = trace_info.message_file_data
  239. if message_file_data is not None:
  240. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  241. file_list.append(file_url)
  242. message_data = trace_info.message_data
  243. if message_data is None:
  244. return
  245. metadata = trace_info.metadata
  246. message_id = trace_info.message_id
  247. user_id = message_data.from_account_id
  248. metadata["user_id"] = user_id
  249. metadata["file_list"] = file_list
  250. if message_data.from_end_user_id:
  251. end_user_data: Optional[EndUser] = (
  252. db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
  253. )
  254. if end_user_data is not None:
  255. end_user_id = end_user_data.session_id
  256. metadata["end_user_id"] = end_user_id
  257. trace_data = {
  258. "id": prepare_opik_uuid(trace_info.start_time, message_id),
  259. "name": TraceTaskName.MESSAGE_TRACE.value,
  260. "start_time": trace_info.start_time,
  261. "end_time": trace_info.end_time,
  262. "metadata": wrap_metadata(metadata),
  263. "input": trace_info.inputs,
  264. "output": message_data.answer,
  265. "tags": ["message", str(trace_info.conversation_mode)],
  266. "project_name": self.project,
  267. }
  268. trace = self.add_trace(trace_data)
  269. span_data = {
  270. "trace_id": trace.id,
  271. "name": "llm",
  272. "type": "llm",
  273. "start_time": trace_info.start_time,
  274. "end_time": trace_info.end_time,
  275. "metadata": wrap_metadata(metadata),
  276. "input": {"input": trace_info.inputs},
  277. "output": {"output": message_data.answer},
  278. "tags": ["llm", str(trace_info.conversation_mode)],
  279. "usage": {
  280. "completion_tokens": trace_info.answer_tokens,
  281. "prompt_tokens": trace_info.message_tokens,
  282. "total_tokens": trace_info.total_tokens,
  283. },
  284. "project_name": self.project,
  285. }
  286. self.add_span(span_data)
  287. def moderation_trace(self, trace_info: ModerationTraceInfo):
  288. if trace_info.message_data is None:
  289. return
  290. start_time = trace_info.start_time or trace_info.message_data.created_at
  291. span_data = {
  292. "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
  293. "name": TraceTaskName.MODERATION_TRACE.value,
  294. "type": "tool",
  295. "start_time": start_time,
  296. "end_time": trace_info.end_time or trace_info.message_data.updated_at,
  297. "metadata": wrap_metadata(trace_info.metadata),
  298. "input": wrap_dict("input", trace_info.inputs),
  299. "output": {
  300. "action": trace_info.action,
  301. "flagged": trace_info.flagged,
  302. "preset_response": trace_info.preset_response,
  303. "inputs": trace_info.inputs,
  304. },
  305. "tags": ["moderation"],
  306. }
  307. self.add_span(span_data)
  308. def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
  309. message_data = trace_info.message_data
  310. if message_data is None:
  311. return
  312. start_time = trace_info.start_time or message_data.created_at
  313. span_data = {
  314. "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
  315. "name": TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
  316. "type": "tool",
  317. "start_time": start_time,
  318. "end_time": trace_info.end_time or message_data.updated_at,
  319. "metadata": wrap_metadata(trace_info.metadata),
  320. "input": wrap_dict("input", trace_info.inputs),
  321. "output": wrap_dict("output", trace_info.suggested_question),
  322. "tags": ["suggested_question"],
  323. }
  324. self.add_span(span_data)
  325. def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
  326. if trace_info.message_data is None:
  327. return
  328. start_time = trace_info.start_time or trace_info.message_data.created_at
  329. span_data = {
  330. "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
  331. "name": TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
  332. "type": "tool",
  333. "start_time": start_time,
  334. "end_time": trace_info.end_time or trace_info.message_data.updated_at,
  335. "metadata": wrap_metadata(trace_info.metadata),
  336. "input": wrap_dict("input", trace_info.inputs),
  337. "output": {"documents": trace_info.documents},
  338. "tags": ["dataset_retrieval"],
  339. }
  340. self.add_span(span_data)
  341. def tool_trace(self, trace_info: ToolTraceInfo):
  342. span_data = {
  343. "trace_id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
  344. "name": trace_info.tool_name,
  345. "type": "tool",
  346. "start_time": trace_info.start_time,
  347. "end_time": trace_info.end_time,
  348. "metadata": wrap_metadata(trace_info.metadata),
  349. "input": wrap_dict("input", trace_info.tool_inputs),
  350. "output": wrap_dict("output", trace_info.tool_outputs),
  351. "tags": ["tool", trace_info.tool_name],
  352. }
  353. self.add_span(span_data)
  354. def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
  355. trace_data = {
  356. "id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
  357. "name": TraceTaskName.GENERATE_NAME_TRACE.value,
  358. "start_time": trace_info.start_time,
  359. "end_time": trace_info.end_time,
  360. "metadata": wrap_metadata(trace_info.metadata),
  361. "input": trace_info.inputs,
  362. "output": trace_info.outputs,
  363. "tags": ["generate_name"],
  364. "project_name": self.project,
  365. }
  366. trace = self.add_trace(trace_data)
  367. span_data = {
  368. "trace_id": trace.id,
  369. "name": TraceTaskName.GENERATE_NAME_TRACE.value,
  370. "start_time": trace_info.start_time,
  371. "end_time": trace_info.end_time,
  372. "metadata": wrap_metadata(trace_info.metadata),
  373. "input": wrap_dict("input", trace_info.inputs),
  374. "output": wrap_dict("output", trace_info.outputs),
  375. "tags": ["generate_name"],
  376. }
  377. self.add_span(span_data)
  378. def add_trace(self, opik_trace_data: dict) -> Trace:
  379. try:
  380. trace = self.opik_client.trace(**opik_trace_data)
  381. logger.debug("Opik Trace created successfully")
  382. return trace
  383. except Exception as e:
  384. raise ValueError(f"Opik Failed to create trace: {str(e)}")
  385. def add_span(self, opik_span_data: dict):
  386. try:
  387. self.opik_client.span(**opik_span_data)
  388. logger.debug("Opik Span created successfully")
  389. except Exception as e:
  390. raise ValueError(f"Opik Failed to create span: {str(e)}")
  391. def api_check(self):
  392. try:
  393. self.opik_client.auth_check()
  394. return True
  395. except Exception as e:
  396. logger.info(f"Opik API check failed: {str(e)}", exc_info=True)
  397. raise ValueError(f"Opik API check failed: {str(e)}")
  398. def get_project_url(self):
  399. try:
  400. return self.opik_client.get_project_url(project_name=self.project)
  401. except Exception as e:
  402. logger.info(f"Opik get run url failed: {str(e)}", exc_info=True)
  403. raise ValueError(f"Opik get run url failed: {str(e)}")