| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 | 
							- import json
 
- import logging
 
- import os
 
- from datetime import datetime, timedelta
 
- from typing import Optional
 
- from langfuse import Langfuse  # type: ignore
 
- from core.ops.base_trace_instance import BaseTraceInstance
 
- from core.ops.entities.config_entity import LangfuseConfig
 
- from core.ops.entities.trace_entity import (
 
-     BaseTraceInfo,
 
-     DatasetRetrievalTraceInfo,
 
-     GenerateNameTraceInfo,
 
-     MessageTraceInfo,
 
-     ModerationTraceInfo,
 
-     SuggestedQuestionTraceInfo,
 
-     ToolTraceInfo,
 
-     TraceTaskName,
 
-     WorkflowTraceInfo,
 
- )
 
- from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
 
-     GenerationUsage,
 
-     LangfuseGeneration,
 
-     LangfuseSpan,
 
-     LangfuseTrace,
 
-     LevelEnum,
 
-     UnitEnum,
 
- )
 
- from core.ops.utils import filter_none_values
 
- from extensions.ext_database import db
 
- from models.model import EndUser
 
- from models.workflow import WorkflowNodeExecution
 
- logger = logging.getLogger(__name__)
 
- class LangFuseDataTrace(BaseTraceInstance):
 
-     def __init__(
 
-         self,
 
-         langfuse_config: LangfuseConfig,
 
-     ):
 
-         super().__init__(langfuse_config)
 
-         self.langfuse_client = Langfuse(
 
-             public_key=langfuse_config.public_key,
 
-             secret_key=langfuse_config.secret_key,
 
-             host=langfuse_config.host,
 
-         )
 
-         self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
 
-     def trace(self, trace_info: BaseTraceInfo):
 
-         if isinstance(trace_info, WorkflowTraceInfo):
 
-             self.workflow_trace(trace_info)
 
-         if isinstance(trace_info, MessageTraceInfo):
 
-             self.message_trace(trace_info)
 
-         if isinstance(trace_info, ModerationTraceInfo):
 
-             self.moderation_trace(trace_info)
 
-         if isinstance(trace_info, SuggestedQuestionTraceInfo):
 
-             self.suggested_question_trace(trace_info)
 
-         if isinstance(trace_info, DatasetRetrievalTraceInfo):
 
-             self.dataset_retrieval_trace(trace_info)
 
-         if isinstance(trace_info, ToolTraceInfo):
 
-             self.tool_trace(trace_info)
 
-         if isinstance(trace_info, GenerateNameTraceInfo):
 
-             self.generate_name_trace(trace_info)
 
-     def workflow_trace(self, trace_info: WorkflowTraceInfo):
 
-         trace_id = trace_info.workflow_run_id
 
-         user_id = trace_info.metadata.get("user_id")
 
-         metadata = trace_info.metadata
 
-         metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
 
-         if trace_info.message_id:
 
-             trace_id = trace_info.message_id
 
-             name = TraceTaskName.MESSAGE_TRACE.value
 
-             trace_data = LangfuseTrace(
 
-                 id=trace_id,
 
-                 user_id=user_id,
 
-                 name=name,
 
-                 input=dict(trace_info.workflow_run_inputs),
 
-                 output=dict(trace_info.workflow_run_outputs),
 
-                 metadata=metadata,
 
-                 session_id=trace_info.conversation_id,
 
-                 tags=["message", "workflow"],
 
-             )
 
-             self.add_trace(langfuse_trace_data=trace_data)
 
-             workflow_span_data = LangfuseSpan(
 
-                 id=trace_info.workflow_run_id,
 
-                 name=TraceTaskName.WORKFLOW_TRACE.value,
 
-                 input=dict(trace_info.workflow_run_inputs),
 
-                 output=dict(trace_info.workflow_run_outputs),
 
-                 trace_id=trace_id,
 
-                 start_time=trace_info.start_time,
 
-                 end_time=trace_info.end_time,
 
-                 metadata=metadata,
 
-                 level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
 
-                 status_message=trace_info.error or "",
 
-             )
 
-             self.add_span(langfuse_span_data=workflow_span_data)
 
-         else:
 
-             trace_data = LangfuseTrace(
 
-                 id=trace_id,
 
-                 user_id=user_id,
 
-                 name=TraceTaskName.WORKFLOW_TRACE.value,
 
-                 input=dict(trace_info.workflow_run_inputs),
 
-                 output=dict(trace_info.workflow_run_outputs),
 
-                 metadata=metadata,
 
-                 session_id=trace_info.conversation_id,
 
-                 tags=["workflow"],
 
-             )
 
-             self.add_trace(langfuse_trace_data=trace_data)
 
-         # through workflow_run_id get all_nodes_execution
 
-         workflow_nodes_execution_id_records = (
 
-             db.session.query(WorkflowNodeExecution.id)
 
-             .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
 
-             .all()
 
-         )
 
-         for node_execution_id_record in workflow_nodes_execution_id_records:
 
-             node_execution = (
 
-                 db.session.query(
 
-                     WorkflowNodeExecution.id,
 
-                     WorkflowNodeExecution.tenant_id,
 
-                     WorkflowNodeExecution.app_id,
 
-                     WorkflowNodeExecution.title,
 
-                     WorkflowNodeExecution.node_type,
 
-                     WorkflowNodeExecution.status,
 
-                     WorkflowNodeExecution.inputs,
 
-                     WorkflowNodeExecution.outputs,
 
-                     WorkflowNodeExecution.created_at,
 
-                     WorkflowNodeExecution.elapsed_time,
 
-                     WorkflowNodeExecution.process_data,
 
-                     WorkflowNodeExecution.execution_metadata,
 
-                 )
 
-                 .filter(WorkflowNodeExecution.id == node_execution_id_record.id)
 
-                 .first()
 
-             )
 
-             if not node_execution:
 
-                 continue
 
-             node_execution_id = node_execution.id
 
-             tenant_id = node_execution.tenant_id
 
-             app_id = node_execution.app_id
 
-             node_name = node_execution.title
 
-             node_type = node_execution.node_type
 
-             status = node_execution.status
 
-             if node_type == "llm":
 
-                 inputs = (
 
-                     json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
 
-                 )
 
-             else:
 
-                 inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
 
-             outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}
 
-             created_at = node_execution.created_at or datetime.now()
 
-             elapsed_time = node_execution.elapsed_time
 
-             finished_at = created_at + timedelta(seconds=elapsed_time)
 
-             metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
 
-             metadata.update(
 
-                 {
 
-                     "workflow_run_id": trace_info.workflow_run_id,
 
-                     "node_execution_id": node_execution_id,
 
-                     "tenant_id": tenant_id,
 
-                     "app_id": app_id,
 
-                     "node_name": node_name,
 
-                     "node_type": node_type,
 
-                     "status": status,
 
-                 }
 
-             )
 
-             process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
 
-             model_provider = process_data.get("model_provider", None)
 
-             model_name = process_data.get("model_name", None)
 
-             if model_provider is not None and model_name is not None:
 
-                 metadata.update(
 
-                     {
 
-                         "model_provider": model_provider,
 
-                         "model_name": model_name,
 
-                     }
 
-                 )
 
-             # add span
 
-             if trace_info.message_id:
 
-                 span_data = LangfuseSpan(
 
-                     id=node_execution_id,
 
-                     name=node_type,
 
-                     input=inputs,
 
-                     output=outputs,
 
-                     trace_id=trace_id,
 
-                     start_time=created_at,
 
-                     end_time=finished_at,
 
-                     metadata=metadata,
 
-                     level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
 
-                     status_message=trace_info.error or "",
 
-                     parent_observation_id=trace_info.workflow_run_id,
 
-                 )
 
-             else:
 
-                 span_data = LangfuseSpan(
 
-                     id=node_execution_id,
 
-                     name=node_type,
 
-                     input=inputs,
 
-                     output=outputs,
 
-                     trace_id=trace_id,
 
-                     start_time=created_at,
 
-                     end_time=finished_at,
 
-                     metadata=metadata,
 
-                     level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
 
-                     status_message=trace_info.error or "",
 
-                 )
 
-             self.add_span(langfuse_span_data=span_data)
 
-             if process_data and process_data.get("model_mode") == "chat":
 
-                 total_token = metadata.get("total_tokens", 0)
 
-                 # add generation
 
-                 generation_usage = GenerationUsage(
 
-                     total=total_token,
 
-                 )
 
-                 node_generation_data = LangfuseGeneration(
 
-                     name="llm",
 
-                     trace_id=trace_id,
 
-                     model=process_data.get("model_name"),
 
-                     parent_observation_id=node_execution_id,
 
-                     start_time=created_at,
 
-                     end_time=finished_at,
 
-                     input=inputs,
 
-                     output=outputs,
 
-                     metadata=metadata,
 
-                     level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
 
-                     status_message=trace_info.error or "",
 
-                     usage=generation_usage,
 
-                 )
 
-                 self.add_generation(langfuse_generation_data=node_generation_data)
 
-     def message_trace(self, trace_info: MessageTraceInfo, **kwargs):
 
-         # get message file data
 
-         file_list = trace_info.file_list
 
-         metadata = trace_info.metadata
 
-         message_data = trace_info.message_data
 
-         if message_data is None:
 
-             return
 
-         message_id = message_data.id
 
-         user_id = message_data.from_account_id
 
-         if message_data.from_end_user_id:
 
-             end_user_data: Optional[EndUser] = (
 
-                 db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
 
-             )
 
-             if end_user_data is not None:
 
-                 user_id = end_user_data.session_id
 
-                 metadata["user_id"] = user_id
 
-         trace_data = LangfuseTrace(
 
-             id=message_id,
 
-             user_id=user_id,
 
-             name=TraceTaskName.MESSAGE_TRACE.value,
 
-             input={
 
-                 "message": trace_info.inputs,
 
-                 "files": file_list,
 
-                 "message_tokens": trace_info.message_tokens,
 
-                 "answer_tokens": trace_info.answer_tokens,
 
-                 "total_tokens": trace_info.total_tokens,
 
-                 "error": trace_info.error,
 
-                 "provider_response_latency": message_data.provider_response_latency,
 
-                 "created_at": trace_info.start_time,
 
-             },
 
-             output=trace_info.outputs,
 
-             metadata=metadata,
 
-             session_id=message_data.conversation_id,
 
-             tags=["message", str(trace_info.conversation_mode)],
 
-             version=None,
 
-             release=None,
 
-             public=None,
 
-         )
 
-         self.add_trace(langfuse_trace_data=trace_data)
 
-         # start add span
 
-         generation_usage = GenerationUsage(
 
-             input=trace_info.message_tokens,
 
-             output=trace_info.answer_tokens,
 
-             total=trace_info.total_tokens,
 
-             unit=UnitEnum.TOKENS,
 
-             totalCost=message_data.total_price,
 
-         )
 
-         langfuse_generation_data = LangfuseGeneration(
 
-             name="llm",
 
-             trace_id=message_id,
 
-             start_time=trace_info.start_time,
 
-             end_time=trace_info.end_time,
 
-             model=message_data.model_id,
 
-             input=trace_info.inputs,
 
-             output=message_data.answer,
 
-             metadata=metadata,
 
-             level=(LevelEnum.DEFAULT if message_data.status != "error" else LevelEnum.ERROR),
 
-             status_message=message_data.error or "",
 
-             usage=generation_usage,
 
-         )
 
-         self.add_generation(langfuse_generation_data)
 
-     def moderation_trace(self, trace_info: ModerationTraceInfo):
 
-         if trace_info.message_data is None:
 
-             return
 
-         span_data = LangfuseSpan(
 
-             name=TraceTaskName.MODERATION_TRACE.value,
 
-             input=trace_info.inputs,
 
-             output={
 
-                 "action": trace_info.action,
 
-                 "flagged": trace_info.flagged,
 
-                 "preset_response": trace_info.preset_response,
 
-                 "inputs": trace_info.inputs,
 
-             },
 
-             trace_id=trace_info.message_id,
 
-             start_time=trace_info.start_time or trace_info.message_data.created_at,
 
-             end_time=trace_info.end_time or trace_info.message_data.created_at,
 
-             metadata=trace_info.metadata,
 
-         )
 
-         self.add_span(langfuse_span_data=span_data)
 
-     def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
 
-         message_data = trace_info.message_data
 
-         if message_data is None:
 
-             return
 
-         generation_usage = GenerationUsage(
 
-             total=len(str(trace_info.suggested_question)),
 
-             input=len(trace_info.inputs) if trace_info.inputs else 0,
 
-             output=len(trace_info.suggested_question),
 
-             unit=UnitEnum.CHARACTERS,
 
-         )
 
-         generation_data = LangfuseGeneration(
 
-             name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
 
-             input=trace_info.inputs,
 
-             output=str(trace_info.suggested_question),
 
-             trace_id=trace_info.message_id,
 
-             start_time=trace_info.start_time,
 
-             end_time=trace_info.end_time,
 
-             metadata=trace_info.metadata,
 
-             level=(LevelEnum.DEFAULT if message_data.status != "error" else LevelEnum.ERROR),
 
-             status_message=message_data.error or "",
 
-             usage=generation_usage,
 
-         )
 
-         self.add_generation(langfuse_generation_data=generation_data)
 
-     def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
 
-         if trace_info.message_data is None:
 
-             return
 
-         dataset_retrieval_span_data = LangfuseSpan(
 
-             name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
 
-             input=trace_info.inputs,
 
-             output={"documents": trace_info.documents},
 
-             trace_id=trace_info.message_id,
 
-             start_time=trace_info.start_time or trace_info.message_data.created_at,
 
-             end_time=trace_info.end_time or trace_info.message_data.updated_at,
 
-             metadata=trace_info.metadata,
 
-         )
 
-         self.add_span(langfuse_span_data=dataset_retrieval_span_data)
 
-     def tool_trace(self, trace_info: ToolTraceInfo):
 
-         tool_span_data = LangfuseSpan(
 
-             name=trace_info.tool_name,
 
-             input=trace_info.tool_inputs,
 
-             output=trace_info.tool_outputs,
 
-             trace_id=trace_info.message_id,
 
-             start_time=trace_info.start_time,
 
-             end_time=trace_info.end_time,
 
-             metadata=trace_info.metadata,
 
-             level=(LevelEnum.DEFAULT if trace_info.error == "" or trace_info.error is None else LevelEnum.ERROR),
 
-             status_message=trace_info.error,
 
-         )
 
-         self.add_span(langfuse_span_data=tool_span_data)
 
-     def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
 
-         name_generation_trace_data = LangfuseTrace(
 
-             name=TraceTaskName.GENERATE_NAME_TRACE.value,
 
-             input=trace_info.inputs,
 
-             output=trace_info.outputs,
 
-             user_id=trace_info.tenant_id,
 
-             metadata=trace_info.metadata,
 
-             session_id=trace_info.conversation_id,
 
-         )
 
-         self.add_trace(langfuse_trace_data=name_generation_trace_data)
 
-         name_generation_span_data = LangfuseSpan(
 
-             name=TraceTaskName.GENERATE_NAME_TRACE.value,
 
-             input=trace_info.inputs,
 
-             output=trace_info.outputs,
 
-             trace_id=trace_info.conversation_id,
 
-             start_time=trace_info.start_time,
 
-             end_time=trace_info.end_time,
 
-             metadata=trace_info.metadata,
 
-         )
 
-         self.add_span(langfuse_span_data=name_generation_span_data)
 
-     def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):
 
-         format_trace_data = filter_none_values(langfuse_trace_data.model_dump()) if langfuse_trace_data else {}
 
-         try:
 
-             self.langfuse_client.trace(**format_trace_data)
 
-             logger.debug("LangFuse Trace created successfully")
 
-         except Exception as e:
 
-             raise ValueError(f"LangFuse Failed to create trace: {str(e)}")
 
-     def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
 
-         format_span_data = filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
 
-         try:
 
-             self.langfuse_client.span(**format_span_data)
 
-             logger.debug("LangFuse Span created successfully")
 
-         except Exception as e:
 
-             raise ValueError(f"LangFuse Failed to create span: {str(e)}")
 
-     def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):
 
-         format_span_data = filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
 
-         span.end(**format_span_data)
 
-     def add_generation(self, langfuse_generation_data: Optional[LangfuseGeneration] = None):
 
-         format_generation_data = (
 
-             filter_none_values(langfuse_generation_data.model_dump()) if langfuse_generation_data else {}
 
-         )
 
-         try:
 
-             self.langfuse_client.generation(**format_generation_data)
 
-             logger.debug("LangFuse Generation created successfully")
 
-         except Exception as e:
 
-             raise ValueError(f"LangFuse Failed to create generation: {str(e)}")
 
-     def update_generation(self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None):
 
-         format_generation_data = (
 
-             filter_none_values(langfuse_generation_data.model_dump()) if langfuse_generation_data else {}
 
-         )
 
-         generation.end(**format_generation_data)
 
-     def api_check(self):
 
-         try:
 
-             return self.langfuse_client.auth_check()
 
-         except Exception as e:
 
-             logger.debug(f"LangFuse API check failed: {str(e)}")
 
-             raise ValueError(f"LangFuse API check failed: {str(e)}")
 
-     def get_project_key(self):
 
-         try:
 
-             projects = self.langfuse_client.client.projects.get()
 
-             return projects.data[0].id
 
-         except Exception as e:
 
-             logger.debug(f"LangFuse get project key failed: {str(e)}")
 
-             raise ValueError(f"LangFuse get project key failed: {str(e)}")
 
 
  |