langfuse_trace.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import json
  2. import logging
  3. import os
  4. from datetime import datetime, timedelta
  5. from typing import Optional
  6. from langfuse import Langfuse
  7. from core.ops.base_trace_instance import BaseTraceInstance
  8. from core.ops.entities.config_entity import LangfuseConfig
  9. from core.ops.entities.trace_entity import (
  10. BaseTraceInfo,
  11. DatasetRetrievalTraceInfo,
  12. GenerateNameTraceInfo,
  13. MessageTraceInfo,
  14. ModerationTraceInfo,
  15. SuggestedQuestionTraceInfo,
  16. ToolTraceInfo,
  17. WorkflowTraceInfo,
  18. )
  19. from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
  20. GenerationUsage,
  21. LangfuseGeneration,
  22. LangfuseSpan,
  23. LangfuseTrace,
  24. LevelEnum,
  25. UnitEnum,
  26. )
  27. from core.ops.utils import filter_none_values
  28. from extensions.ext_database import db
  29. from models.model import EndUser
  30. from models.workflow import WorkflowNodeExecution
  31. logger = logging.getLogger(__name__)
  32. class LangFuseDataTrace(BaseTraceInstance):
  33. def __init__(
  34. self,
  35. langfuse_config: LangfuseConfig,
  36. ):
  37. super().__init__(langfuse_config)
  38. self.langfuse_client = Langfuse(
  39. public_key=langfuse_config.public_key,
  40. secret_key=langfuse_config.secret_key,
  41. host=langfuse_config.host,
  42. )
  43. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  44. def trace(self, trace_info: BaseTraceInfo):
  45. if isinstance(trace_info, WorkflowTraceInfo):
  46. self.workflow_trace(trace_info)
  47. if isinstance(trace_info, MessageTraceInfo):
  48. self.message_trace(trace_info)
  49. if isinstance(trace_info, ModerationTraceInfo):
  50. self.moderation_trace(trace_info)
  51. if isinstance(trace_info, SuggestedQuestionTraceInfo):
  52. self.suggested_question_trace(trace_info)
  53. if isinstance(trace_info, DatasetRetrievalTraceInfo):
  54. self.dataset_retrieval_trace(trace_info)
  55. if isinstance(trace_info, ToolTraceInfo):
  56. self.tool_trace(trace_info)
  57. if isinstance(trace_info, GenerateNameTraceInfo):
  58. self.generate_name_trace(trace_info)
  59. def workflow_trace(self, trace_info: WorkflowTraceInfo):
  60. trace_id = trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id
  61. if trace_info.message_id:
  62. trace_id = trace_info.message_id
  63. name = f"message_{trace_info.message_id}"
  64. trace_data = LangfuseTrace(
  65. id=trace_info.message_id,
  66. user_id=trace_info.tenant_id,
  67. name=name,
  68. input=trace_info.workflow_run_inputs,
  69. output=trace_info.workflow_run_outputs,
  70. metadata=trace_info.metadata,
  71. session_id=trace_info.conversation_id,
  72. tags=["message", "workflow"],
  73. )
  74. self.add_trace(langfuse_trace_data=trace_data)
  75. workflow_span_data = LangfuseSpan(
  76. id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
  77. name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
  78. input=trace_info.workflow_run_inputs,
  79. output=trace_info.workflow_run_outputs,
  80. trace_id=trace_id,
  81. start_time=trace_info.start_time,
  82. end_time=trace_info.end_time,
  83. metadata=trace_info.metadata,
  84. level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
  85. status_message=trace_info.error if trace_info.error else "",
  86. )
  87. self.add_span(langfuse_span_data=workflow_span_data)
  88. else:
  89. trace_data = LangfuseTrace(
  90. id=trace_id,
  91. user_id=trace_info.tenant_id,
  92. name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
  93. input=trace_info.workflow_run_inputs,
  94. output=trace_info.workflow_run_outputs,
  95. metadata=trace_info.metadata,
  96. session_id=trace_info.conversation_id,
  97. tags=["workflow"],
  98. )
  99. self.add_trace(langfuse_trace_data=trace_data)
  100. # through workflow_run_id get all_nodes_execution
  101. workflow_nodes_executions = (
  102. db.session.query(
  103. WorkflowNodeExecution.id,
  104. WorkflowNodeExecution.tenant_id,
  105. WorkflowNodeExecution.app_id,
  106. WorkflowNodeExecution.title,
  107. WorkflowNodeExecution.node_type,
  108. WorkflowNodeExecution.status,
  109. WorkflowNodeExecution.inputs,
  110. WorkflowNodeExecution.outputs,
  111. WorkflowNodeExecution.created_at,
  112. WorkflowNodeExecution.elapsed_time,
  113. WorkflowNodeExecution.process_data,
  114. WorkflowNodeExecution.execution_metadata,
  115. )
  116. .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
  117. .all()
  118. )
  119. for node_execution in workflow_nodes_executions:
  120. node_execution_id = node_execution.id
  121. tenant_id = node_execution.tenant_id
  122. app_id = node_execution.app_id
  123. node_name = node_execution.title
  124. node_type = node_execution.node_type
  125. status = node_execution.status
  126. if node_type == "llm":
  127. inputs = json.loads(node_execution.process_data).get(
  128. "prompts", {}
  129. ) if node_execution.process_data else {}
  130. else:
  131. inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
  132. outputs = (
  133. json.loads(node_execution.outputs) if node_execution.outputs else {}
  134. )
  135. created_at = node_execution.created_at if node_execution.created_at else datetime.now()
  136. elapsed_time = node_execution.elapsed_time
  137. finished_at = created_at + timedelta(seconds=elapsed_time)
  138. metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
  139. metadata.update(
  140. {
  141. "workflow_run_id": trace_info.workflow_run_id,
  142. "node_execution_id": node_execution_id,
  143. "tenant_id": tenant_id,
  144. "app_id": app_id,
  145. "node_name": node_name,
  146. "node_type": node_type,
  147. "status": status,
  148. }
  149. )
  150. # add span
  151. if trace_info.message_id:
  152. span_data = LangfuseSpan(
  153. id=node_execution_id,
  154. name=f"{node_name}_{node_execution_id}",
  155. input=inputs,
  156. output=outputs,
  157. trace_id=trace_id,
  158. start_time=created_at,
  159. end_time=finished_at,
  160. metadata=metadata,
  161. level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
  162. status_message=trace_info.error if trace_info.error else "",
  163. parent_observation_id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
  164. )
  165. else:
  166. span_data = LangfuseSpan(
  167. id=node_execution_id,
  168. name=f"{node_name}_{node_execution_id}",
  169. input=inputs,
  170. output=outputs,
  171. trace_id=trace_id,
  172. start_time=created_at,
  173. end_time=finished_at,
  174. metadata=metadata,
  175. level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
  176. status_message=trace_info.error if trace_info.error else "",
  177. )
  178. self.add_span(langfuse_span_data=span_data)
  179. process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
  180. if process_data and process_data.get("model_mode") == "chat":
  181. total_token = metadata.get("total_tokens", 0)
  182. # add generation
  183. generation_usage = GenerationUsage(
  184. totalTokens=total_token,
  185. )
  186. node_generation_data = LangfuseGeneration(
  187. name=f"generation_{node_execution_id}",
  188. trace_id=trace_id,
  189. parent_observation_id=node_execution_id,
  190. start_time=created_at,
  191. end_time=finished_at,
  192. input=inputs,
  193. output=outputs,
  194. metadata=metadata,
  195. level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
  196. status_message=trace_info.error if trace_info.error else "",
  197. usage=generation_usage,
  198. )
  199. self.add_generation(langfuse_generation_data=node_generation_data)
  200. def message_trace(
  201. self, trace_info: MessageTraceInfo, **kwargs
  202. ):
  203. # get message file data
  204. file_list = trace_info.file_list
  205. metadata = trace_info.metadata
  206. message_data = trace_info.message_data
  207. message_id = message_data.id
  208. user_id = message_data.from_account_id
  209. if message_data.from_end_user_id:
  210. end_user_data: EndUser = db.session.query(EndUser).filter(
  211. EndUser.id == message_data.from_end_user_id
  212. ).first()
  213. if end_user_data is not None:
  214. user_id = end_user_data.session_id
  215. metadata["user_id"] = user_id
  216. trace_data = LangfuseTrace(
  217. id=message_id,
  218. user_id=user_id,
  219. name=f"message_{message_id}",
  220. input={
  221. "message": trace_info.inputs,
  222. "files": file_list,
  223. "message_tokens": trace_info.message_tokens,
  224. "answer_tokens": trace_info.answer_tokens,
  225. "total_tokens": trace_info.total_tokens,
  226. "error": trace_info.error,
  227. "provider_response_latency": message_data.provider_response_latency,
  228. "created_at": trace_info.start_time,
  229. },
  230. output=trace_info.outputs,
  231. metadata=metadata,
  232. session_id=message_data.conversation_id,
  233. tags=["message", str(trace_info.conversation_mode)],
  234. version=None,
  235. release=None,
  236. public=None,
  237. )
  238. self.add_trace(langfuse_trace_data=trace_data)
  239. # start add span
  240. generation_usage = GenerationUsage(
  241. totalTokens=trace_info.total_tokens,
  242. input=trace_info.message_tokens,
  243. output=trace_info.answer_tokens,
  244. total=trace_info.total_tokens,
  245. unit=UnitEnum.TOKENS,
  246. totalCost=message_data.total_price,
  247. )
  248. langfuse_generation_data = LangfuseGeneration(
  249. name=f"generation_{message_id}",
  250. trace_id=message_id,
  251. start_time=trace_info.start_time,
  252. end_time=trace_info.end_time,
  253. model=message_data.model_id,
  254. input=trace_info.inputs,
  255. output=message_data.answer,
  256. metadata=metadata,
  257. level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
  258. status_message=message_data.error if message_data.error else "",
  259. usage=generation_usage,
  260. )
  261. self.add_generation(langfuse_generation_data)
  262. def moderation_trace(self, trace_info: ModerationTraceInfo):
  263. span_data = LangfuseSpan(
  264. name="moderation",
  265. input=trace_info.inputs,
  266. output={
  267. "action": trace_info.action,
  268. "flagged": trace_info.flagged,
  269. "preset_response": trace_info.preset_response,
  270. "inputs": trace_info.inputs,
  271. },
  272. trace_id=trace_info.message_id,
  273. start_time=trace_info.start_time or trace_info.message_data.created_at,
  274. end_time=trace_info.end_time or trace_info.message_data.created_at,
  275. metadata=trace_info.metadata,
  276. )
  277. self.add_span(langfuse_span_data=span_data)
  278. def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
  279. message_data = trace_info.message_data
  280. generation_usage = GenerationUsage(
  281. totalTokens=len(str(trace_info.suggested_question)),
  282. input=len(trace_info.inputs),
  283. output=len(trace_info.suggested_question),
  284. total=len(trace_info.suggested_question),
  285. unit=UnitEnum.CHARACTERS,
  286. )
  287. generation_data = LangfuseGeneration(
  288. name="suggested_question",
  289. input=trace_info.inputs,
  290. output=str(trace_info.suggested_question),
  291. trace_id=trace_info.message_id,
  292. start_time=trace_info.start_time,
  293. end_time=trace_info.end_time,
  294. metadata=trace_info.metadata,
  295. level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
  296. status_message=message_data.error if message_data.error else "",
  297. usage=generation_usage,
  298. )
  299. self.add_generation(langfuse_generation_data=generation_data)
  300. def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
  301. dataset_retrieval_span_data = LangfuseSpan(
  302. name="dataset_retrieval",
  303. input=trace_info.inputs,
  304. output={"documents": trace_info.documents},
  305. trace_id=trace_info.message_id,
  306. start_time=trace_info.start_time or trace_info.message_data.created_at,
  307. end_time=trace_info.end_time or trace_info.message_data.updated_at,
  308. metadata=trace_info.metadata,
  309. )
  310. self.add_span(langfuse_span_data=dataset_retrieval_span_data)
  311. def tool_trace(self, trace_info: ToolTraceInfo):
  312. tool_span_data = LangfuseSpan(
  313. name=trace_info.tool_name,
  314. input=trace_info.tool_inputs,
  315. output=trace_info.tool_outputs,
  316. trace_id=trace_info.message_id,
  317. start_time=trace_info.start_time,
  318. end_time=trace_info.end_time,
  319. metadata=trace_info.metadata,
  320. level=LevelEnum.DEFAULT if trace_info.error == "" or trace_info.error is None else LevelEnum.ERROR,
  321. status_message=trace_info.error,
  322. )
  323. self.add_span(langfuse_span_data=tool_span_data)
  324. def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
  325. name_generation_trace_data = LangfuseTrace(
  326. name="generate_name",
  327. input=trace_info.inputs,
  328. output=trace_info.outputs,
  329. user_id=trace_info.tenant_id,
  330. metadata=trace_info.metadata,
  331. session_id=trace_info.conversation_id,
  332. )
  333. self.add_trace(langfuse_trace_data=name_generation_trace_data)
  334. name_generation_span_data = LangfuseSpan(
  335. name="generate_name",
  336. input=trace_info.inputs,
  337. output=trace_info.outputs,
  338. trace_id=trace_info.conversation_id,
  339. start_time=trace_info.start_time,
  340. end_time=trace_info.end_time,
  341. metadata=trace_info.metadata,
  342. )
  343. self.add_span(langfuse_span_data=name_generation_span_data)
  344. def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):
  345. format_trace_data = (
  346. filter_none_values(langfuse_trace_data.model_dump()) if langfuse_trace_data else {}
  347. )
  348. try:
  349. self.langfuse_client.trace(**format_trace_data)
  350. logger.debug("LangFuse Trace created successfully")
  351. except Exception as e:
  352. raise ValueError(f"LangFuse Failed to create trace: {str(e)}")
  353. def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
  354. format_span_data = (
  355. filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
  356. )
  357. try:
  358. self.langfuse_client.span(**format_span_data)
  359. logger.debug("LangFuse Span created successfully")
  360. except Exception as e:
  361. raise ValueError(f"LangFuse Failed to create span: {str(e)}")
  362. def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):
  363. format_span_data = (
  364. filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
  365. )
  366. span.end(**format_span_data)
  367. def add_generation(
  368. self, langfuse_generation_data: Optional[LangfuseGeneration] = None
  369. ):
  370. format_generation_data = (
  371. filter_none_values(langfuse_generation_data.model_dump())
  372. if langfuse_generation_data
  373. else {}
  374. )
  375. try:
  376. self.langfuse_client.generation(**format_generation_data)
  377. logger.debug("LangFuse Generation created successfully")
  378. except Exception as e:
  379. raise ValueError(f"LangFuse Failed to create generation: {str(e)}")
  380. def update_generation(
  381. self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None
  382. ):
  383. format_generation_data = (
  384. filter_none_values(langfuse_generation_data.model_dump())
  385. if langfuse_generation_data
  386. else {}
  387. )
  388. generation.end(**format_generation_data)
  389. def api_check(self):
  390. try:
  391. return self.langfuse_client.auth_check()
  392. except Exception as e:
  393. logger.debug(f"LangFuse API check failed: {str(e)}")
  394. raise ValueError(f"LangFuse API check failed: {str(e)}")