Browse Source

fix: fail-branch stream output error (#13401)

Co-authored-by: Novice Lee <novicelee@NoviPro.local>
Novice 1 month ago
parent
commit
fe0d932f50

+ 4 - 2
api/core/workflow/graph_engine/graph_engine.py

@@ -738,8 +738,10 @@ class GraphEngine:
                                     )
                                     )
                                 should_continue_retry = False
                                 should_continue_retry = False
                             elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
                             elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
-                                if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
-                                    node_instance.node_id
+                                if (
+                                    node_instance.should_continue_on_error
+                                    and self.graph.edge_mapping.get(node_instance.node_id)
+                                    and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH
                                 ):
                                 ):
                                     run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
                                     run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
                                 if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
                                 if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):

+ 7 - 5
api/core/workflow/nodes/answer/answer_stream_processor.py

@@ -82,7 +82,7 @@ class AnswerStreamProcessor(StreamProcessor):
         :param event: node run succeeded event
         :param event: node run succeeded event
         :return:
         :return:
         """
         """
-        for answer_node_id, position in self.route_position.items():
+        for answer_node_id in self.route_position:
             # all depends on answer node id not in rest node ids
             # all depends on answer node id not in rest node ids
             if event.route_node_state.node_id != answer_node_id and (
             if event.route_node_state.node_id != answer_node_id and (
                 answer_node_id not in self.rest_node_ids
                 answer_node_id not in self.rest_node_ids
@@ -155,11 +155,13 @@ class AnswerStreamProcessor(StreamProcessor):
         for answer_node_id, route_position in self.route_position.items():
         for answer_node_id, route_position in self.route_position.items():
             if answer_node_id not in self.rest_node_ids:
             if answer_node_id not in self.rest_node_ids:
                 continue
                 continue
-
+            # exclude current node id
+            answer_dependencies = self.generate_routes.answer_dependencies
+            if event.node_id in answer_dependencies[answer_node_id]:
+                answer_dependencies[answer_node_id].remove(event.node_id)
+            answer_dependencies_ids = answer_dependencies.get(answer_node_id, [])
             # all depends on answer node id not in rest node ids
             # all depends on answer node id not in rest node ids
-            if all(
-                dep_id not in self.rest_node_ids for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
-            ):
+            if all(dep_id not in self.rest_node_ids for dep_id in answer_dependencies_ids):
                 if route_position >= len(self.generate_routes.answer_generate_route[answer_node_id]):
                 if route_position >= len(self.generate_routes.answer_generate_route[answer_node_id]):
                     continue
                     continue