| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 | import threadingfrom typing import Optionalimport contextsfrom extensions.ext_database import dbfrom libs.infinite_scroll_pagination import InfiniteScrollPaginationfrom models.enums import WorkflowRunTriggeredFromfrom models.model import Appfrom models.workflow import (    WorkflowNodeExecution,    WorkflowNodeExecutionTriggeredFrom,    WorkflowRun,)class WorkflowRunService:    def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:        """        Get advanced chat app workflow run list        Only return triggered_from == advanced_chat        :param app_model: app model        :param args: request args        """        class WorkflowWithMessage:            message_id: str            conversation_id: str            def __init__(self, workflow_run: WorkflowRun):                self._workflow_run = workflow_run            def __getattr__(self, item):                return getattr(self._workflow_run, item)        pagination = self.get_paginate_workflow_runs(app_model, args)        with_message_workflow_runs = []        for workflow_run in pagination.data:            message = workflow_run.message            with_message_workflow_run = WorkflowWithMessage(workflow_run=workflow_run)            if message:                with_message_workflow_run.message_id = message.id                with_message_workflow_run.conversation_id = message.conversation_id            with_message_workflow_runs.append(with_message_workflow_run)        pagination.data = with_message_workflow_runs        return pagination    def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:        """        Get debug workflow run list        Only return triggered_from == debugging        :param app_model: app model        :param args: request args        """        limit = int(args.get("limit", 20))        base_query = db.session.query(WorkflowRun).filter(            WorkflowRun.tenant_id == app_model.tenant_id,            WorkflowRun.app_id == app_model.id,            WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value,        )        if args.get("last_id"):            last_workflow_run = base_query.filter(                WorkflowRun.id == args.get("last_id"),            ).first()            if not last_workflow_run:                raise ValueError("Last workflow run not exists")            workflow_runs = (                base_query.filter(                    WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id                )                .order_by(WorkflowRun.created_at.desc())                .limit(limit)                .all()            )        else:            workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()        has_more = False        if len(workflow_runs) == limit:            current_page_first_workflow_run = workflow_runs[-1]            rest_count = base_query.filter(                WorkflowRun.created_at < current_page_first_workflow_run.created_at,                WorkflowRun.id != current_page_first_workflow_run.id,            ).count()            if rest_count > 0:                has_more = True        return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)    def get_workflow_run(self, app_model: App, run_id: str) -> Optional[WorkflowRun]:        """        Get workflow run detail        :param app_model: app model        :param run_id: workflow run id        """        workflow_run = (            db.session.query(WorkflowRun)            .filter(                WorkflowRun.tenant_id == app_model.tenant_id,                WorkflowRun.app_id == app_model.id,                WorkflowRun.id == run_id,            )            .first()        )        return workflow_run    def get_workflow_run_node_executions(self, app_model: App, run_id: str) -> list[WorkflowNodeExecution]:        """        Get workflow run node execution list        """        workflow_run = self.get_workflow_run(app_model, run_id)        contexts.plugin_tool_providers.set({})        contexts.plugin_tool_providers_lock.set(threading.Lock())        if not workflow_run:            return []        node_executions = (            db.session.query(WorkflowNodeExecution)            .filter(                WorkflowNodeExecution.tenant_id == app_model.tenant_id,                WorkflowNodeExecution.app_id == app_model.id,                WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,                WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,                WorkflowNodeExecution.workflow_run_id == run_id,            )            .order_by(WorkflowNodeExecution.index.desc())            .all()        )        return node_executions
 |