123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- import json
- import logging
- import os
- import uuid
- from datetime import datetime, timedelta
- from typing import Optional, cast
- from opik import Opik, Trace
- from opik.id_helpers import uuid4_to_uuid7
- from core.ops.base_trace_instance import BaseTraceInstance
- from core.ops.entities.config_entity import OpikConfig
- from core.ops.entities.trace_entity import (
- BaseTraceInfo,
- DatasetRetrievalTraceInfo,
- GenerateNameTraceInfo,
- MessageTraceInfo,
- ModerationTraceInfo,
- SuggestedQuestionTraceInfo,
- ToolTraceInfo,
- TraceTaskName,
- WorkflowTraceInfo,
- )
- from extensions.ext_database import db
- from models.model import EndUser, MessageFile
- from models.workflow import WorkflowNodeExecution
- logger = logging.getLogger(__name__)
- def wrap_dict(key_name, data):
- """Make sure that the input data is a dict"""
- if not isinstance(data, dict):
- return {key_name: data}
- return data
- def wrap_metadata(metadata, **kwargs):
- """Add common metatada to all Traces and Spans"""
- metadata["created_from"] = "dify"
- metadata.update(kwargs)
- return metadata
- def prepare_opik_uuid(user_datetime: Optional[datetime], user_uuid: Optional[str]):
- """Opik needs UUIDv7 while Dify uses UUIDv4 for identifier of most
- messages and objects. The type-hints of BaseTraceInfo indicates that
- objects start_time and message_id could be null which means we cannot map
- it to a UUIDv7. Given that we have no way to identify that object
- uniquely, generate a new random one UUIDv7 in that case.
- """
- if user_datetime is None:
- user_datetime = datetime.now()
- if user_uuid is None:
- user_uuid = str(uuid.uuid4())
- return uuid4_to_uuid7(user_datetime, user_uuid)
- class OpikDataTrace(BaseTraceInstance):
- def __init__(
- self,
- opik_config: OpikConfig,
- ):
- super().__init__(opik_config)
- self.opik_client = Opik(
- project_name=opik_config.project,
- workspace=opik_config.workspace,
- host=opik_config.url,
- api_key=opik_config.api_key,
- )
- self.project = opik_config.project
- self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
- def trace(self, trace_info: BaseTraceInfo):
- if isinstance(trace_info, WorkflowTraceInfo):
- self.workflow_trace(trace_info)
- if isinstance(trace_info, MessageTraceInfo):
- self.message_trace(trace_info)
- if isinstance(trace_info, ModerationTraceInfo):
- self.moderation_trace(trace_info)
- if isinstance(trace_info, SuggestedQuestionTraceInfo):
- self.suggested_question_trace(trace_info)
- if isinstance(trace_info, DatasetRetrievalTraceInfo):
- self.dataset_retrieval_trace(trace_info)
- if isinstance(trace_info, ToolTraceInfo):
- self.tool_trace(trace_info)
- if isinstance(trace_info, GenerateNameTraceInfo):
- self.generate_name_trace(trace_info)
- def workflow_trace(self, trace_info: WorkflowTraceInfo):
- dify_trace_id = trace_info.workflow_run_id
- opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
- workflow_metadata = wrap_metadata(
- trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
- )
- root_span_id = None
- if trace_info.message_id:
- dify_trace_id = trace_info.message_id
- opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
- trace_data = {
- "id": opik_trace_id,
- "name": TraceTaskName.MESSAGE_TRACE.value,
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": workflow_metadata,
- "input": wrap_dict("input", trace_info.workflow_run_inputs),
- "output": wrap_dict("output", trace_info.workflow_run_outputs),
- "tags": ["message", "workflow"],
- "project_name": self.project,
- }
- self.add_trace(trace_data)
- root_span_id = prepare_opik_uuid(trace_info.start_time, trace_info.workflow_run_id)
- span_data = {
- "id": root_span_id,
- "parent_span_id": None,
- "trace_id": opik_trace_id,
- "name": TraceTaskName.WORKFLOW_TRACE.value,
- "input": wrap_dict("input", trace_info.workflow_run_inputs),
- "output": wrap_dict("output", trace_info.workflow_run_outputs),
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": workflow_metadata,
- "tags": ["workflow"],
- "project_name": self.project,
- }
- self.add_span(span_data)
- else:
- trace_data = {
- "id": opik_trace_id,
- "name": TraceTaskName.MESSAGE_TRACE.value,
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": workflow_metadata,
- "input": wrap_dict("input", trace_info.workflow_run_inputs),
- "output": wrap_dict("output", trace_info.workflow_run_outputs),
- "tags": ["workflow"],
- "project_name": self.project,
- }
- self.add_trace(trace_data)
- # through workflow_run_id get all_nodes_execution
- workflow_nodes_execution_id_records = (
- db.session.query(WorkflowNodeExecution.id)
- .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
- .all()
- )
- for node_execution_id_record in workflow_nodes_execution_id_records:
- node_execution = (
- db.session.query(
- WorkflowNodeExecution.id,
- WorkflowNodeExecution.tenant_id,
- WorkflowNodeExecution.app_id,
- WorkflowNodeExecution.title,
- WorkflowNodeExecution.node_type,
- WorkflowNodeExecution.status,
- WorkflowNodeExecution.inputs,
- WorkflowNodeExecution.outputs,
- WorkflowNodeExecution.created_at,
- WorkflowNodeExecution.elapsed_time,
- WorkflowNodeExecution.process_data,
- WorkflowNodeExecution.execution_metadata,
- )
- .filter(WorkflowNodeExecution.id == node_execution_id_record.id)
- .first()
- )
- if not node_execution:
- continue
- node_execution_id = node_execution.id
- tenant_id = node_execution.tenant_id
- app_id = node_execution.app_id
- node_name = node_execution.title
- node_type = node_execution.node_type
- status = node_execution.status
- if node_type == "llm":
- inputs = (
- json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
- )
- else:
- inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
- outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}
- created_at = node_execution.created_at or datetime.now()
- elapsed_time = node_execution.elapsed_time
- finished_at = created_at + timedelta(seconds=elapsed_time)
- execution_metadata = (
- json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
- )
- metadata = execution_metadata.copy()
- metadata.update(
- {
- "workflow_run_id": trace_info.workflow_run_id,
- "node_execution_id": node_execution_id,
- "tenant_id": tenant_id,
- "app_id": app_id,
- "app_name": node_name,
- "node_type": node_type,
- "status": status,
- }
- )
- process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
- provider = None
- model = None
- total_tokens = 0
- completion_tokens = 0
- prompt_tokens = 0
- if process_data and process_data.get("model_mode") == "chat":
- run_type = "llm"
- provider = process_data.get("model_provider", None)
- model = process_data.get("model_name", "")
- metadata.update(
- {
- "ls_provider": provider,
- "ls_model_name": model,
- }
- )
- try:
- if outputs.get("usage"):
- total_tokens = outputs["usage"].get("total_tokens", 0)
- prompt_tokens = outputs["usage"].get("prompt_tokens", 0)
- completion_tokens = outputs["usage"].get("completion_tokens", 0)
- except Exception:
- logger.error("Failed to extract usage", exc_info=True)
- else:
- run_type = "tool"
- parent_span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id
- if not total_tokens:
- total_tokens = execution_metadata.get("total_tokens", 0)
- span_data = {
- "trace_id": opik_trace_id,
- "id": prepare_opik_uuid(created_at, node_execution_id),
- "parent_span_id": prepare_opik_uuid(trace_info.start_time, parent_span_id),
- "name": node_type,
- "type": run_type,
- "start_time": created_at,
- "end_time": finished_at,
- "metadata": wrap_metadata(metadata),
- "input": wrap_dict("input", inputs),
- "output": wrap_dict("output", outputs),
- "tags": ["node_execution"],
- "project_name": self.project,
- "usage": {
- "total_tokens": total_tokens,
- "completion_tokens": completion_tokens,
- "prompt_tokens": prompt_tokens,
- },
- "model": model,
- "provider": provider,
- }
- self.add_span(span_data)
- def message_trace(self, trace_info: MessageTraceInfo):
- # get message file data
- file_list = cast(list[str], trace_info.file_list) or []
- message_file_data: Optional[MessageFile] = trace_info.message_file_data
- if message_file_data is not None:
- file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
- file_list.append(file_url)
- message_data = trace_info.message_data
- if message_data is None:
- return
- metadata = trace_info.metadata
- message_id = trace_info.message_id
- user_id = message_data.from_account_id
- metadata["user_id"] = user_id
- metadata["file_list"] = file_list
- if message_data.from_end_user_id:
- end_user_data: Optional[EndUser] = (
- db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
- )
- if end_user_data is not None:
- end_user_id = end_user_data.session_id
- metadata["end_user_id"] = end_user_id
- trace_data = {
- "id": prepare_opik_uuid(trace_info.start_time, message_id),
- "name": TraceTaskName.MESSAGE_TRACE.value,
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": wrap_metadata(metadata),
- "input": trace_info.inputs,
- "output": message_data.answer,
- "tags": ["message", str(trace_info.conversation_mode)],
- "project_name": self.project,
- }
- trace = self.add_trace(trace_data)
- span_data = {
- "trace_id": trace.id,
- "name": "llm",
- "type": "llm",
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": wrap_metadata(metadata),
- "input": {"input": trace_info.inputs},
- "output": {"output": message_data.answer},
- "tags": ["llm", str(trace_info.conversation_mode)],
- "usage": {
- "completion_tokens": trace_info.answer_tokens,
- "prompt_tokens": trace_info.message_tokens,
- "total_tokens": trace_info.total_tokens,
- },
- "project_name": self.project,
- }
- self.add_span(span_data)
- def moderation_trace(self, trace_info: ModerationTraceInfo):
- if trace_info.message_data is None:
- return
- start_time = trace_info.start_time or trace_info.message_data.created_at
- span_data = {
- "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
- "name": TraceTaskName.MODERATION_TRACE.value,
- "type": "tool",
- "start_time": start_time,
- "end_time": trace_info.end_time or trace_info.message_data.updated_at,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": wrap_dict("input", trace_info.inputs),
- "output": {
- "action": trace_info.action,
- "flagged": trace_info.flagged,
- "preset_response": trace_info.preset_response,
- "inputs": trace_info.inputs,
- },
- "tags": ["moderation"],
- }
- self.add_span(span_data)
- def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
- message_data = trace_info.message_data
- if message_data is None:
- return
- start_time = trace_info.start_time or message_data.created_at
- span_data = {
- "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
- "name": TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
- "type": "tool",
- "start_time": start_time,
- "end_time": trace_info.end_time or message_data.updated_at,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": wrap_dict("input", trace_info.inputs),
- "output": wrap_dict("output", trace_info.suggested_question),
- "tags": ["suggested_question"],
- }
- self.add_span(span_data)
- def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
- if trace_info.message_data is None:
- return
- start_time = trace_info.start_time or trace_info.message_data.created_at
- span_data = {
- "trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
- "name": TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
- "type": "tool",
- "start_time": start_time,
- "end_time": trace_info.end_time or trace_info.message_data.updated_at,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": wrap_dict("input", trace_info.inputs),
- "output": {"documents": trace_info.documents},
- "tags": ["dataset_retrieval"],
- }
- self.add_span(span_data)
- def tool_trace(self, trace_info: ToolTraceInfo):
- span_data = {
- "trace_id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
- "name": trace_info.tool_name,
- "type": "tool",
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": wrap_dict("input", trace_info.tool_inputs),
- "output": wrap_dict("output", trace_info.tool_outputs),
- "tags": ["tool", trace_info.tool_name],
- }
- self.add_span(span_data)
- def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
- trace_data = {
- "id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
- "name": TraceTaskName.GENERATE_NAME_TRACE.value,
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": trace_info.inputs,
- "output": trace_info.outputs,
- "tags": ["generate_name"],
- "project_name": self.project,
- }
- trace = self.add_trace(trace_data)
- span_data = {
- "trace_id": trace.id,
- "name": TraceTaskName.GENERATE_NAME_TRACE.value,
- "start_time": trace_info.start_time,
- "end_time": trace_info.end_time,
- "metadata": wrap_metadata(trace_info.metadata),
- "input": wrap_dict("input", trace_info.inputs),
- "output": wrap_dict("output", trace_info.outputs),
- "tags": ["generate_name"],
- }
- self.add_span(span_data)
- def add_trace(self, opik_trace_data: dict) -> Trace:
- try:
- trace = self.opik_client.trace(**opik_trace_data)
- logger.debug("Opik Trace created successfully")
- return trace
- except Exception as e:
- raise ValueError(f"Opik Failed to create trace: {str(e)}")
- def add_span(self, opik_span_data: dict):
- try:
- self.opik_client.span(**opik_span_data)
- logger.debug("Opik Span created successfully")
- except Exception as e:
- raise ValueError(f"Opik Failed to create span: {str(e)}")
- def api_check(self):
- try:
- self.opik_client.auth_check()
- return True
- except Exception as e:
- logger.info(f"Opik API check failed: {str(e)}", exc_info=True)
- raise ValueError(f"Opik API check failed: {str(e)}")
- def get_project_url(self):
- try:
- return self.opik_client.get_project_url(project_name=self.project)
- except Exception as e:
- logger.info(f"Opik get run url failed: {str(e)}", exc_info=True)
- raise ValueError(f"Opik get run url failed: {str(e)}")
|