| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 | import jsonimport loggingimport osimport uuidfrom datetime import datetime, timedeltafrom typing import Optional, castfrom langsmith import Clientfrom langsmith.schemas import RunBasefrom core.ops.base_trace_instance import BaseTraceInstancefrom core.ops.entities.config_entity import LangSmithConfigfrom core.ops.entities.trace_entity import (    BaseTraceInfo,    DatasetRetrievalTraceInfo,    GenerateNameTraceInfo,    MessageTraceInfo,    ModerationTraceInfo,    SuggestedQuestionTraceInfo,    ToolTraceInfo,    TraceTaskName,    WorkflowTraceInfo,)from core.ops.langsmith_trace.entities.langsmith_trace_entity import (    LangSmithRunModel,    LangSmithRunType,    LangSmithRunUpdateModel,)from core.ops.utils import filter_none_values, generate_dotted_orderfrom extensions.ext_database import dbfrom models.model import EndUser, MessageFilefrom models.workflow import WorkflowNodeExecutionlogger = logging.getLogger(__name__)class LangSmithDataTrace(BaseTraceInstance):    def __init__(        self,        langsmith_config: LangSmithConfig,    ):        super().__init__(langsmith_config)        self.langsmith_key = langsmith_config.api_key        self.project_name = langsmith_config.project        self.project_id = None        self.langsmith_client = Client(api_key=langsmith_config.api_key, api_url=langsmith_config.endpoint)        self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")    def trace(self, trace_info: BaseTraceInfo):        if isinstance(trace_info, WorkflowTraceInfo):            self.workflow_trace(trace_info)        if isinstance(trace_info, MessageTraceInfo):            self.message_trace(trace_info)        if isinstance(trace_info, ModerationTraceInfo):            self.moderation_trace(trace_info)        if isinstance(trace_info, SuggestedQuestionTraceInfo):            self.suggested_question_trace(trace_info)        if isinstance(trace_info, DatasetRetrievalTraceInfo):            self.dataset_retrieval_trace(trace_info)        if isinstance(trace_info, ToolTraceInfo):            self.tool_trace(trace_info)        if isinstance(trace_info, GenerateNameTraceInfo):            self.generate_name_trace(trace_info)    def workflow_trace(self, trace_info: WorkflowTraceInfo):        trace_id = trace_info.message_id or trace_info.workflow_run_id        if trace_info.start_time is None:            trace_info.start_time = datetime.now()        message_dotted_order = (            generate_dotted_order(trace_info.message_id, trace_info.start_time) if trace_info.message_id else None        )        workflow_dotted_order = generate_dotted_order(            trace_info.workflow_run_id,            trace_info.workflow_data.created_at,            message_dotted_order,        )        metadata = trace_info.metadata        metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id        if trace_info.message_id:            message_run = LangSmithRunModel(                id=trace_info.message_id,                name=TraceTaskName.MESSAGE_TRACE.value,                inputs=dict(trace_info.workflow_run_inputs),                outputs=dict(trace_info.workflow_run_outputs),                run_type=LangSmithRunType.chain,                start_time=trace_info.start_time,                end_time=trace_info.end_time,                extra={                    "metadata": metadata,                },                tags=["message", "workflow"],                error=trace_info.error,                trace_id=trace_id,                dotted_order=message_dotted_order,                file_list=[],                serialized=None,                parent_run_id=None,                events=[],                session_id=None,                session_name=None,                reference_example_id=None,                input_attachments={},                output_attachments={},            )            self.add_run(message_run)        langsmith_run = LangSmithRunModel(            file_list=trace_info.file_list,            total_tokens=trace_info.total_tokens,            id=trace_info.workflow_run_id,            name=TraceTaskName.WORKFLOW_TRACE.value,            inputs=dict(trace_info.workflow_run_inputs),            run_type=LangSmithRunType.tool,            start_time=trace_info.workflow_data.created_at,            end_time=trace_info.workflow_data.finished_at,            outputs=dict(trace_info.workflow_run_outputs),            extra={                "metadata": metadata,            },            error=trace_info.error,            tags=["workflow"],            parent_run_id=trace_info.message_id or None,            trace_id=trace_id,            dotted_order=workflow_dotted_order,            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},        )        self.add_run(langsmith_run)        # through workflow_run_id get all_nodes_execution        workflow_nodes_execution_id_records = (            db.session.query(WorkflowNodeExecution.id)            .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)            .all()        )        for node_execution_id_record in workflow_nodes_execution_id_records:            node_execution = (                db.session.query(                    WorkflowNodeExecution.id,                    WorkflowNodeExecution.tenant_id,                    WorkflowNodeExecution.app_id,                    WorkflowNodeExecution.title,                    WorkflowNodeExecution.node_type,                    WorkflowNodeExecution.status,                    WorkflowNodeExecution.inputs,                    WorkflowNodeExecution.outputs,                    WorkflowNodeExecution.created_at,                    WorkflowNodeExecution.elapsed_time,                    WorkflowNodeExecution.process_data,                    WorkflowNodeExecution.execution_metadata,                )                .filter(WorkflowNodeExecution.id == node_execution_id_record.id)                .first()            )            if not node_execution:                continue            node_execution_id = node_execution.id            tenant_id = node_execution.tenant_id            app_id = node_execution.app_id            node_name = node_execution.title            node_type = node_execution.node_type            status = node_execution.status            if node_type == "llm":                inputs = (                    json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}                )            else:                inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}            outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}            created_at = node_execution.created_at or datetime.now()            elapsed_time = node_execution.elapsed_time            finished_at = created_at + timedelta(seconds=elapsed_time)            execution_metadata = (                json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}            )            node_total_tokens = execution_metadata.get("total_tokens", 0)            metadata = execution_metadata.copy()            metadata.update(                {                    "workflow_run_id": trace_info.workflow_run_id,                    "node_execution_id": node_execution_id,                    "tenant_id": tenant_id,                    "app_id": app_id,                    "app_name": node_name,                    "node_type": node_type,                    "status": status,                }            )            process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}            if process_data and process_data.get("model_mode") == "chat":                run_type = LangSmithRunType.llm                metadata.update(                    {                        "ls_provider": process_data.get("model_provider", ""),                        "ls_model_name": process_data.get("model_name", ""),                    }                )            elif node_type == "knowledge-retrieval":                run_type = LangSmithRunType.retriever            else:                run_type = LangSmithRunType.tool            node_dotted_order = generate_dotted_order(node_execution_id, created_at, workflow_dotted_order)            langsmith_run = LangSmithRunModel(                total_tokens=node_total_tokens,                name=node_type,                inputs=inputs,                run_type=run_type,                start_time=created_at,                end_time=finished_at,                outputs=outputs,                file_list=trace_info.file_list,                extra={                    "metadata": metadata,                },                parent_run_id=trace_info.workflow_run_id,                tags=["node_execution"],                id=node_execution_id,                trace_id=trace_id,                dotted_order=node_dotted_order,                error="",                serialized=None,                events=[],                session_id=None,                session_name=None,                reference_example_id=None,                input_attachments={},                output_attachments={},            )            self.add_run(langsmith_run)    def message_trace(self, trace_info: MessageTraceInfo):        # get message file data        file_list = cast(list[str], trace_info.file_list) or []        message_file_data: Optional[MessageFile] = trace_info.message_file_data        file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""        file_list.append(file_url)        metadata = trace_info.metadata        message_data = trace_info.message_data        if message_data is None:            return        message_id = message_data.id        user_id = message_data.from_account_id        metadata["user_id"] = user_id        if message_data.from_end_user_id:            end_user_data: Optional[EndUser] = (                db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()            )            if end_user_data is not None:                end_user_id = end_user_data.session_id                metadata["end_user_id"] = end_user_id        message_run = LangSmithRunModel(            input_tokens=trace_info.message_tokens,            output_tokens=trace_info.answer_tokens,            total_tokens=trace_info.total_tokens,            id=message_id,            name=TraceTaskName.MESSAGE_TRACE.value,            inputs=trace_info.inputs,            run_type=LangSmithRunType.chain,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            outputs=message_data.answer,            extra={"metadata": metadata},            tags=["message", str(trace_info.conversation_mode)],            error=trace_info.error,            file_list=file_list,            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            parent_run_id=None,        )        self.add_run(message_run)        # create llm run parented to message run        llm_run = LangSmithRunModel(            input_tokens=trace_info.message_tokens,            output_tokens=trace_info.answer_tokens,            total_tokens=trace_info.total_tokens,            name="llm",            inputs=trace_info.inputs,            run_type=LangSmithRunType.llm,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            outputs=message_data.answer,            extra={"metadata": metadata},            parent_run_id=message_id,            tags=["llm", str(trace_info.conversation_mode)],            error=trace_info.error,            file_list=file_list,            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            id=str(uuid.uuid4()),        )        self.add_run(llm_run)    def moderation_trace(self, trace_info: ModerationTraceInfo):        if trace_info.message_data is None:            return        langsmith_run = LangSmithRunModel(            name=TraceTaskName.MODERATION_TRACE.value,            inputs=trace_info.inputs,            outputs={                "action": trace_info.action,                "flagged": trace_info.flagged,                "preset_response": trace_info.preset_response,                "inputs": trace_info.inputs,            },            run_type=LangSmithRunType.tool,            extra={"metadata": trace_info.metadata},            tags=["moderation"],            parent_run_id=trace_info.message_id,            start_time=trace_info.start_time or trace_info.message_data.created_at,            end_time=trace_info.end_time or trace_info.message_data.updated_at,            id=str(uuid.uuid4()),            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            error="",            file_list=[],        )        self.add_run(langsmith_run)    def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):        message_data = trace_info.message_data        if message_data is None:            return        suggested_question_run = LangSmithRunModel(            name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value,            inputs=trace_info.inputs,            outputs=trace_info.suggested_question,            run_type=LangSmithRunType.tool,            extra={"metadata": trace_info.metadata},            tags=["suggested_question"],            parent_run_id=trace_info.message_id,            start_time=trace_info.start_time or message_data.created_at,            end_time=trace_info.end_time or message_data.updated_at,            id=str(uuid.uuid4()),            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            error="",            file_list=[],        )        self.add_run(suggested_question_run)    def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):        if trace_info.message_data is None:            return        dataset_retrieval_run = LangSmithRunModel(            name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value,            inputs=trace_info.inputs,            outputs={"documents": trace_info.documents},            run_type=LangSmithRunType.retriever,            extra={"metadata": trace_info.metadata},            tags=["dataset_retrieval"],            parent_run_id=trace_info.message_id,            start_time=trace_info.start_time or trace_info.message_data.created_at,            end_time=trace_info.end_time or trace_info.message_data.updated_at,            id=str(uuid.uuid4()),            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            error="",            file_list=[],        )        self.add_run(dataset_retrieval_run)    def tool_trace(self, trace_info: ToolTraceInfo):        tool_run = LangSmithRunModel(            name=trace_info.tool_name,            inputs=trace_info.tool_inputs,            outputs=trace_info.tool_outputs,            run_type=LangSmithRunType.tool,            extra={                "metadata": trace_info.metadata,            },            tags=["tool", trace_info.tool_name],            parent_run_id=trace_info.message_id,            start_time=trace_info.start_time,            end_time=trace_info.end_time,            file_list=[cast(str, trace_info.file_url)],            id=str(uuid.uuid4()),            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            error=trace_info.error or "",        )        self.add_run(tool_run)    def generate_name_trace(self, trace_info: GenerateNameTraceInfo):        name_run = LangSmithRunModel(            name=TraceTaskName.GENERATE_NAME_TRACE.value,            inputs=trace_info.inputs,            outputs=trace_info.outputs,            run_type=LangSmithRunType.tool,            extra={"metadata": trace_info.metadata},            tags=["generate_name"],            start_time=trace_info.start_time or datetime.now(),            end_time=trace_info.end_time or datetime.now(),            id=str(uuid.uuid4()),            serialized=None,            events=[],            session_id=None,            session_name=None,            reference_example_id=None,            input_attachments={},            output_attachments={},            trace_id=None,            dotted_order=None,            error="",            file_list=[],            parent_run_id=None,        )        self.add_run(name_run)    def add_run(self, run_data: LangSmithRunModel):        data = run_data.model_dump()        if self.project_id:            data["session_id"] = self.project_id        elif self.project_name:            data["session_name"] = self.project_name        data = filter_none_values(data)        try:            self.langsmith_client.create_run(**data)            logger.debug("LangSmith Run created successfully.")        except Exception as e:            raise ValueError(f"LangSmith Failed to create run: {str(e)}")    def update_run(self, update_run_data: LangSmithRunUpdateModel):        data = update_run_data.model_dump()        data = filter_none_values(data)        try:            self.langsmith_client.update_run(**data)            logger.debug("LangSmith Run updated successfully.")        except Exception as e:            raise ValueError(f"LangSmith Failed to update run: {str(e)}")    def api_check(self):        try:            random_project_name = f"test_project_{datetime.now().strftime('%Y%m%d%H%M%S')}"            self.langsmith_client.create_project(project_name=random_project_name)            self.langsmith_client.delete_project(project_name=random_project_name)            return True        except Exception as e:            logger.debug(f"LangSmith API check failed: {str(e)}")            raise ValueError(f"LangSmith API check failed: {str(e)}")    def get_project_url(self):        try:            run_data = RunBase(                id=uuid.uuid4(),                name="tool",                inputs={"input": "test"},                outputs={"output": "test"},                run_type=LangSmithRunType.tool,                start_time=datetime.now(),            )            project_url = self.langsmith_client.get_run_url(                run=run_data, project_id=self.project_id, project_name=self.project_name            )            return project_url.split("/r/")[0]        except Exception as e:            logger.debug(f"LangSmith get run url failed: {str(e)}")            raise ValueError(f"LangSmith get run url failed: {str(e)}")
 |