| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 | import jsonimport loggingimport osfrom datetime import datetime, timedeltafrom typing import Optionalfrom langfuse import Langfusefrom core.ops.base_trace_instance import BaseTraceInstancefrom core.ops.entities.config_entity import LangfuseConfigfrom core.ops.entities.trace_entity import (    BaseTraceInfo,    DatasetRetrievalTraceInfo,    GenerateNameTraceInfo,    MessageTraceInfo,    ModerationTraceInfo,    SuggestedQuestionTraceInfo,    ToolTraceInfo,    TraceTaskName,    WorkflowTraceInfo,)from core.ops.langfuse_trace.entities.langfuse_trace_entity import (    GenerationUsage,    LangfuseGeneration,    LangfuseSpan,    LangfuseTrace,    LevelEnum,    UnitEnum,)from core.ops.utils import filter_none_valuesfrom extensions.ext_database import dbfrom models.model import EndUserfrom models.workflow import WorkflowNodeExecutionlogger = logging.getLogger(__name__)class LangFuseDataTrace(BaseTraceInstance):    def __init__(        self,        langfuse_config: LangfuseConfig,    ):        super().__init__(langfuse_config)        self.langfuse_client = Langfuse(            public_key=langfuse_config.public_key,            secret_key=langfuse_config.secret_key,            host=langfuse_config.host,        )        self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")    def trace(self, trace_info: BaseTraceInfo):        if isinstance(trace_info, WorkflowTraceInfo):            self.workflow_trace(trace_info)        if isinstance(trace_info, MessageTraceInfo):            self.message_trace(trace_info)        if isinstance(trace_info, ModerationTraceInfo):            self.moderation_trace(trace_info)        if isinstance(trace_info, SuggestedQuestionTraceInfo):            self.suggested_question_trace(trace_info)        if isinstance(trace_info, DatasetRetrievalTraceInfo):            self.dataset_retrieval_trace(trace_info)        if isinstance(trace_info, ToolTraceInfo):            self.tool_trace(trace_info)        if isinstance(trace_info, GenerateNameTraceInfo):            self.generate_name_trace(trace_info)    def workflow_trace(self, trace_info: WorkflowTraceInfo):        trace_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id        user_id = trace_info.metadata.get("user_id")        if trace_info.message_id:            trace_id = trace_info.message_id            name = TraceTaskName.MESSAGE_TRACE.value            trace_data = LangfuseTrace(                id=trace_id,                user_id=user_id,                name=name,                input=trace_info.workflow_run_inputs,                output=trace_info.workflow_run_outputs,                metadata=trace_info.metadata,                session_id=trace_info.conversation_id,                tags=["message", "workflow"],                created_at=trace_info.start_time,                updated_at=trace_info.end_time,            )            self.add_trace(langfuse_trace_data=trace_data)            workflow_span_data = LangfuseSpan(                id=(trace_info.workflow_app_log_id or trace_info.workflow_run_id),                name=TraceTaskName.WORKFLOW_TRACE.value,                input=trace_info.workflow_run_inputs,                output=trace_info.workflow_run_outputs,                trace_id=trace_id,                start_time=trace_info.start_time,                end_time=trace_info.end_time,                metadata=trace_info.metadata,                level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,                status_message=trace_info.error or "",            )            self.add_span(langfuse_span_data=workflow_span_data)        else:            trace_data = LangfuseTrace(                id=trace_id,                user_id=user_id,                name=TraceTaskName.WORKFLOW_TRACE.value,                input=trace_info.workflow_run_inputs,                output=trace_info.workflow_run_outputs,                metadata=trace_info.metadata,                session_id=trace_info.conversation_id,                tags=["workflow"],            )            self.add_trace(langfuse_trace_data=trace_data)        # through workflow_run_id get all_nodes_execution        workflow_nodes_execution_id_records = (            db.session.query(WorkflowNodeExecution.id)            .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)            .all()        )        for node_execution_id_record in workflow_nodes_execution_id_records:            node_execution = (                db.session.query(                    WorkflowNodeExecution.id,                    WorkflowNodeExecution.tenant_id,                    WorkflowNodeExecution.app_id,                    WorkflowNodeExecution.title,                    WorkflowNodeExecution.node_type,                    WorkflowNodeExecution.status,                    WorkflowNodeExecution.inputs,                    WorkflowNodeExecution.outputs,                    WorkflowNodeExecution.created_at,                    WorkflowNodeExecution.elapsed_time,                    WorkflowNodeExecution.process_data,                    WorkflowNodeExecution.execution_metadata,                )                .filter(WorkflowNodeExecution.id == node_execution_id_record.id)                .first()            )            if not node_execution:                continue            node_execution_id = node_execution.id            tenant_id = node_execution.tenant_id            app_id = node_execution.app_id            node_name = node_execution.title            node_type = node_execution.node_type            status = node_execution.status            if node_type == "llm":                inputs = (                    json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}                )            else:                inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}            outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}            created_at = node_execution.created_at or datetime.now()            elapsed_time = node_execution.elapsed_time            finished_at = created_at + timedelta(seconds=elapsed_time)            metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}            metadata.update(                {                    "workflow_run_id": trace_info.workflow_run_id,                    "node_execution_id": node_execution_id,                    "tenant_id": tenant_id,                    "app_id": app_id,                    "node_name": node_name,                    "node_type": node_type,                    "status": status,                }            )            process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}            model_provider = process_data.get("model_provider", None)            model_name = process_data.get("model_name", None)            if model_provider is not None and model_name is not None:                metadata.update(                    {                        "model_provider": model_provider,                        "model_name": model_name,                    }                )            # add span            if trace_info.message_id:                span_data = LangfuseSpan(                    id=node_execution_id,                    name=node_type,                    input=inputs,                    output=outputs,                    trace_id=trace_id,                    start_time=created_at,                    end_time=finished_at,                    metadata=metadata,                    level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),                    status_message=trace_info.error or "",                    parent_observation_id=(trace_info.workflow_app_log_id or trace_info.workflow_run_id),                )            else:                span_data = LangfuseSpan(                    id=node_execution_id,                    name=node_type,                    input=inputs,                    output=outputs,                    trace_id=trace_id,                    start_time=created_at,                    end_time=finished_at,                    metadata=metadata,                    level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),                    status_message=trace_info.error or "",                )            self.add_span(langfuse_span_data=span_data)            if process_data and process_data.get("model_mode") == "chat":                total_token = metadata.get("total_tokens", 0)                # add generation                generation_usage = GenerationUsage(                    total=total_token,                )                node_generation_data = LangfuseGeneration(                    name="llm",                    trace_id=trace_id,                    model=process_data.get("model_name"),                    parent_observation_id=node_execution_id,                    start_time=created_at,                    end_time=finished_at,                    input=inputs,                    output=outputs,                    metadata=metadata,                    level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),                    status_message=trace_info.error or "",                    usage=generation_usage,                )                self.add_generation(langfuse_generation_data=node_generation_data)    def message_trace(self, trace_info: MessageTraceInfo, **kwargs):        # get message file data        file_list = trace_info.file_list        metadata = trace_info.metadata        message_data = trace_info.message_data        message_id = message_data.id        user_id = message_data.from_account_id        if message_data.from_end_user_id:            end_user_data: EndUser = (                db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()            )            if end_user_data is not None:                user_id = end_user_data.session_id                metadata["user_id"] = user_id        trace_data = LangfuseTrace(            id=message_id,            user_id=user_id,            name=TraceTaskName.MESSAGE_TRACE.value,            input={                "message": trace_info.inputs,                "files": file_list,                "message_tokens": trace_info.message_tokens,                "answer_tokens": trace_info.answer_tokens,                "total_tokens": trace_info.total_tokens,                "error": trace_info.error,                "provider_response_latency": message_data.provider_response_latency,                "created_at": trace_info.start_time,            },            output=trace_info.outputs,            metadata=metadata,            session_id=message_data.conversation_id,            tags=["message", str(trace_info.conversation_mode)],            version=None,            release=None,            public=None,        )        self.add_trace(langfuse_trace_data=trace_data)        # start add span        generation_usage = GenerationUsage(            input=trace_info.message_tokens,            output=trace_info.answer_tokens,            total=trace_info.total_tokens,            unit=UnitEnum.TOKENS,            totalCost=message_data.total_price,        )        langfuse_generation_data = LangfuseGeneration(            name="llm",            trace_id=message_id,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            model=message_data.model_id,            input=trace_info.inputs,            output=message_data.answer,            metadata=metadata,            level=(LevelEnum.DEFAULT if message_data.status != "error" else LevelEnum.ERROR),            status_message=message_data.error or "",            usage=generation_usage,        )        self.add_generation(langfuse_generation_data)    def moderation_trace(self, trace_info: ModerationTraceInfo):        span_data = LangfuseSpan(            name=TraceTaskName.MODERATION_TRACE.value,            input=trace_info.inputs,            output={                "action": trace_info.action,                "flagged": trace_info.flagged,                "preset_response": trace_info.preset_response,                "inputs": trace_info.inputs,            },            trace_id=trace_info.message_id,            start_time=trace_info.start_time or trace_info.message_data.created_at,            end_time=trace_info.end_time or trace_info.message_data.created_at,            metadata=trace_info.metadata,        )        self.add_span(langfuse_span_data=span_data)    def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):        message_data = trace_info.message_data        generation_usage = GenerationUsage(            total=len(str(trace_info.suggested_question)),            input=len(trace_info.inputs),            output=len(trace_info.suggested_question),            unit=UnitEnum.CHARACTERS,        )        generation_data = LangfuseGeneration(            name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value,            input=trace_info.inputs,            output=str(trace_info.suggested_question),            trace_id=trace_info.message_id,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            metadata=trace_info.metadata,            level=(LevelEnum.DEFAULT if message_data.status != "error" else LevelEnum.ERROR),            status_message=message_data.error or "",            usage=generation_usage,        )        self.add_generation(langfuse_generation_data=generation_data)    def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):        dataset_retrieval_span_data = LangfuseSpan(            name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value,            input=trace_info.inputs,            output={"documents": trace_info.documents},            trace_id=trace_info.message_id,            start_time=trace_info.start_time or trace_info.message_data.created_at,            end_time=trace_info.end_time or trace_info.message_data.updated_at,            metadata=trace_info.metadata,        )        self.add_span(langfuse_span_data=dataset_retrieval_span_data)    def tool_trace(self, trace_info: ToolTraceInfo):        tool_span_data = LangfuseSpan(            name=trace_info.tool_name,            input=trace_info.tool_inputs,            output=trace_info.tool_outputs,            trace_id=trace_info.message_id,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            metadata=trace_info.metadata,            level=(LevelEnum.DEFAULT if trace_info.error == "" or trace_info.error is None else LevelEnum.ERROR),            status_message=trace_info.error,        )        self.add_span(langfuse_span_data=tool_span_data)    def generate_name_trace(self, trace_info: GenerateNameTraceInfo):        name_generation_trace_data = LangfuseTrace(            name=TraceTaskName.GENERATE_NAME_TRACE.value,            input=trace_info.inputs,            output=trace_info.outputs,            user_id=trace_info.tenant_id,            metadata=trace_info.metadata,            session_id=trace_info.conversation_id,        )        self.add_trace(langfuse_trace_data=name_generation_trace_data)        name_generation_span_data = LangfuseSpan(            name=TraceTaskName.GENERATE_NAME_TRACE.value,            input=trace_info.inputs,            output=trace_info.outputs,            trace_id=trace_info.conversation_id,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            metadata=trace_info.metadata,        )        self.add_span(langfuse_span_data=name_generation_span_data)    def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):        format_trace_data = filter_none_values(langfuse_trace_data.model_dump()) if langfuse_trace_data else {}        try:            self.langfuse_client.trace(**format_trace_data)            logger.debug("LangFuse Trace created successfully")        except Exception as e:            raise ValueError(f"LangFuse Failed to create trace: {str(e)}")    def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):        format_span_data = filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}        try:            self.langfuse_client.span(**format_span_data)            logger.debug("LangFuse Span created successfully")        except Exception as e:            raise ValueError(f"LangFuse Failed to create span: {str(e)}")    def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):        format_span_data = filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}        span.end(**format_span_data)    def add_generation(self, langfuse_generation_data: Optional[LangfuseGeneration] = None):        format_generation_data = (            filter_none_values(langfuse_generation_data.model_dump()) if langfuse_generation_data else {}        )        try:            self.langfuse_client.generation(**format_generation_data)            logger.debug("LangFuse Generation created successfully")        except Exception as e:            raise ValueError(f"LangFuse Failed to create generation: {str(e)}")    def update_generation(self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None):        format_generation_data = (            filter_none_values(langfuse_generation_data.model_dump()) if langfuse_generation_data else {}        )        generation.end(**format_generation_data)    def api_check(self):        try:            return self.langfuse_client.auth_check()        except Exception as e:            logger.debug(f"LangFuse API check failed: {str(e)}")            raise ValueError(f"LangFuse API check failed: {str(e)}")    def get_project_key(self):        try:            projects = self.langfuse_client.client.projects.get()            return projects.data[0].id        except Exception as e:            logger.debug(f"LangFuse get project key failed: {str(e)}")            raise ValueError(f"LangFuse get project key failed: {str(e)}")
 |