use-workflow-run.ts 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { v4 as uuidV4 } from 'uuid'
  8. import { usePathname } from 'next/navigation'
  9. import { useWorkflowStore } from '../store'
  10. import { useNodesSyncDraft } from '../hooks'
  11. import {
  12. BlockEnum,
  13. NodeRunningStatus,
  14. WorkflowRunningStatus,
  15. } from '../types'
  16. import { DEFAULT_ITER_TIMES } from '../constants'
  17. import { useWorkflowUpdate } from './use-workflow-interactions'
  18. import { useStore as useAppStore } from '@/app/components/app/store'
  19. import type { IOtherOptions } from '@/service/base'
  20. import { ssePost } from '@/service/base'
  21. import { stopWorkflowRun } from '@/service/workflow'
  22. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  23. import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
  24. import {
  25. getFilesInLogs,
  26. } from '@/app/components/base/file-uploader/utils'
  27. import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'
  28. import type { NodeTracing, VersionHistory } from '@/types/workflow'
  29. export const useWorkflowRun = () => {
  30. const store = useStoreApi()
  31. const workflowStore = useWorkflowStore()
  32. const reactflow = useReactFlow()
  33. const featuresStore = useFeaturesStore()
  34. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  35. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  36. const pathname = usePathname()
  37. const handleBackupDraft = useCallback(() => {
  38. const {
  39. getNodes,
  40. edges,
  41. } = store.getState()
  42. const { getViewport } = reactflow
  43. const {
  44. backupDraft,
  45. setBackupDraft,
  46. environmentVariables,
  47. } = workflowStore.getState()
  48. const { features } = featuresStore!.getState()
  49. if (!backupDraft) {
  50. setBackupDraft({
  51. nodes: getNodes(),
  52. edges,
  53. viewport: getViewport(),
  54. features,
  55. environmentVariables,
  56. })
  57. doSyncWorkflowDraft()
  58. }
  59. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  60. const handleLoadBackupDraft = useCallback(() => {
  61. const {
  62. backupDraft,
  63. setBackupDraft,
  64. setEnvironmentVariables,
  65. } = workflowStore.getState()
  66. if (backupDraft) {
  67. const {
  68. nodes,
  69. edges,
  70. viewport,
  71. features,
  72. environmentVariables,
  73. } = backupDraft
  74. handleUpdateWorkflowCanvas({
  75. nodes,
  76. edges,
  77. viewport,
  78. })
  79. setEnvironmentVariables(environmentVariables)
  80. featuresStore!.setState({ features })
  81. setBackupDraft(undefined)
  82. }
  83. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  84. const handleRun = useCallback(async (
  85. params: any,
  86. callback?: IOtherOptions,
  87. ) => {
  88. const {
  89. getNodes,
  90. setNodes,
  91. } = store.getState()
  92. const newNodes = produce(getNodes(), (draft) => {
  93. draft.forEach((node) => {
  94. node.data.selected = false
  95. node.data._runningStatus = undefined
  96. })
  97. })
  98. setNodes(newNodes)
  99. await doSyncWorkflowDraft()
  100. const {
  101. onWorkflowStarted,
  102. onWorkflowFinished,
  103. onNodeStarted,
  104. onNodeFinished,
  105. onIterationStart,
  106. onIterationNext,
  107. onIterationFinish,
  108. onNodeRetry,
  109. onError,
  110. ...restCallback
  111. } = callback || {}
  112. workflowStore.setState({ historyWorkflowData: undefined })
  113. const appDetail = useAppStore.getState().appDetail
  114. const workflowContainer = document.getElementById('workflow-container')
  115. const {
  116. clientWidth,
  117. clientHeight,
  118. } = workflowContainer!
  119. let url = ''
  120. if (appDetail?.mode === 'advanced-chat')
  121. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  122. if (appDetail?.mode === 'workflow')
  123. url = `/apps/${appDetail.id}/workflows/draft/run`
  124. let prevNodeId = ''
  125. const {
  126. setWorkflowRunningData,
  127. } = workflowStore.getState()
  128. setWorkflowRunningData({
  129. result: {
  130. status: WorkflowRunningStatus.Running,
  131. },
  132. tracing: [],
  133. resultText: '',
  134. })
  135. let ttsUrl = ''
  136. let ttsIsPublic = false
  137. if (params.token) {
  138. ttsUrl = '/text-to-audio'
  139. ttsIsPublic = true
  140. }
  141. else if (params.appId) {
  142. if (pathname.search('explore/installed') > -1)
  143. ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
  144. else
  145. ttsUrl = `/apps/${params.appId}/text-to-audio`
  146. }
  147. const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {})
  148. ssePost(
  149. url,
  150. {
  151. body: params,
  152. },
  153. {
  154. onWorkflowStarted: (params) => {
  155. const { task_id, data } = params
  156. const {
  157. workflowRunningData,
  158. setWorkflowRunningData,
  159. setIterParallelLogMap,
  160. } = workflowStore.getState()
  161. const {
  162. getNodes,
  163. setNodes,
  164. edges,
  165. setEdges,
  166. } = store.getState()
  167. setIterParallelLogMap(new Map())
  168. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  169. draft.task_id = task_id
  170. draft.result = {
  171. ...draft?.result,
  172. ...data,
  173. status: WorkflowRunningStatus.Running,
  174. }
  175. }))
  176. const nodes = getNodes()
  177. const newNodes = produce(nodes, (draft) => {
  178. draft.forEach((node) => {
  179. node.data._waitingRun = true
  180. node.data._runningBranchId = undefined
  181. })
  182. })
  183. setNodes(newNodes)
  184. const newEdges = produce(edges, (draft) => {
  185. draft.forEach((edge) => {
  186. edge.data = {
  187. ...edge.data,
  188. _sourceRunningStatus: undefined,
  189. _targetRunningStatus: undefined,
  190. _waitingRun: true,
  191. }
  192. })
  193. })
  194. setEdges(newEdges)
  195. if (onWorkflowStarted)
  196. onWorkflowStarted(params)
  197. },
  198. onWorkflowFinished: (params) => {
  199. const { data } = params
  200. const {
  201. workflowRunningData,
  202. setWorkflowRunningData,
  203. } = workflowStore.getState()
  204. const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
  205. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  206. draft.result = {
  207. ...draft.result,
  208. ...data,
  209. files: getFilesInLogs(data.outputs),
  210. } as any
  211. if (isStringOutput) {
  212. draft.resultTabActive = true
  213. draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
  214. }
  215. }))
  216. prevNodeId = ''
  217. if (onWorkflowFinished)
  218. onWorkflowFinished(params)
  219. },
  220. onError: (params) => {
  221. const {
  222. workflowRunningData,
  223. setWorkflowRunningData,
  224. } = workflowStore.getState()
  225. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  226. draft.result = {
  227. ...draft.result,
  228. status: WorkflowRunningStatus.Failed,
  229. }
  230. }))
  231. if (onError)
  232. onError(params)
  233. },
  234. onNodeStarted: (params) => {
  235. const { data } = params
  236. const {
  237. workflowRunningData,
  238. setWorkflowRunningData,
  239. iterParallelLogMap,
  240. setIterParallelLogMap,
  241. } = workflowStore.getState()
  242. const {
  243. getNodes,
  244. setNodes,
  245. edges,
  246. setEdges,
  247. transform,
  248. } = store.getState()
  249. const nodes = getNodes()
  250. const node = nodes.find(node => node.id === data.node_id)
  251. if (node?.parentId) {
  252. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  253. const tracing = draft.tracing!
  254. const iterations = tracing.find(trace => trace.node_id === node?.parentId)
  255. const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1]
  256. if (!data.parallel_run_id) {
  257. currIteration?.push({
  258. ...data,
  259. status: NodeRunningStatus.Running,
  260. } as any)
  261. }
  262. else {
  263. const nodeId = iterations?.node_id as string
  264. if (!iterParallelLogMap.has(nodeId as string))
  265. iterParallelLogMap.set(iterations?.node_id as string, new Map())
  266. const currentIterLogMap = iterParallelLogMap.get(nodeId)!
  267. if (!currentIterLogMap.has(data.parallel_run_id))
  268. currentIterLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any])
  269. else
  270. currentIterLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any)
  271. setIterParallelLogMap(iterParallelLogMap)
  272. if (iterations)
  273. iterations.details = Array.from(currentIterLogMap.values())
  274. }
  275. }))
  276. }
  277. else {
  278. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  279. draft.tracing!.push({
  280. ...data,
  281. status: NodeRunningStatus.Running,
  282. } as any)
  283. }))
  284. const {
  285. setViewport,
  286. } = reactflow
  287. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  288. const currentNode = nodes[currentNodeIndex]
  289. const position = currentNode.position
  290. const zoom = transform[2]
  291. if (!currentNode.parentId) {
  292. setViewport({
  293. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  294. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  295. zoom: transform[2],
  296. })
  297. }
  298. const newNodes = produce(nodes, (draft) => {
  299. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  300. draft[currentNodeIndex].data._waitingRun = false
  301. })
  302. setNodes(newNodes)
  303. const newEdges = produce(edges, (draft) => {
  304. const incomeEdges = draft.filter((edge) => {
  305. return edge.target === data.node_id
  306. })
  307. incomeEdges.forEach((edge) => {
  308. const incomeNode = nodes.find(node => node.id === edge.source)!
  309. if (
  310. (!incomeNode.data._runningBranchId && edge.sourceHandle === 'source')
  311. || (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId)
  312. ) {
  313. edge.data = {
  314. ...edge.data,
  315. _sourceRunningStatus: incomeNode.data._runningStatus,
  316. _targetRunningStatus: NodeRunningStatus.Running,
  317. _waitingRun: false,
  318. }
  319. }
  320. })
  321. })
  322. setEdges(newEdges)
  323. }
  324. if (onNodeStarted)
  325. onNodeStarted(params)
  326. },
  327. onNodeFinished: (params) => {
  328. const { data } = params
  329. const {
  330. workflowRunningData,
  331. setWorkflowRunningData,
  332. iterParallelLogMap,
  333. setIterParallelLogMap,
  334. } = workflowStore.getState()
  335. const {
  336. getNodes,
  337. setNodes,
  338. edges,
  339. setEdges,
  340. } = store.getState()
  341. const nodes = getNodes()
  342. const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
  343. if (nodeParentId) {
  344. if (!data.execution_metadata.parallel_mode_run_id) {
  345. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  346. const tracing = draft.tracing!
  347. const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node
  348. if (iterations && iterations.details) {
  349. const iterationIndex = data.execution_metadata?.iteration_index || 0
  350. if (!iterations.details[iterationIndex])
  351. iterations.details[iterationIndex] = []
  352. const currIteration = iterations.details[iterationIndex]
  353. const nodeIndex = currIteration.findIndex(node =>
  354. node.node_id === data.node_id && (
  355. node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id),
  356. )
  357. if (nodeIndex !== -1) {
  358. currIteration[nodeIndex] = {
  359. ...currIteration[nodeIndex],
  360. ...(currIteration[nodeIndex].retryDetail
  361. ? { retryDetail: currIteration[nodeIndex].retryDetail }
  362. : {}),
  363. ...data,
  364. } as any
  365. }
  366. else {
  367. currIteration.push({
  368. ...data,
  369. } as any)
  370. }
  371. }
  372. }))
  373. }
  374. else {
  375. // open parallel mode
  376. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  377. const tracing = draft.tracing!
  378. const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node
  379. if (iterations && iterations.details) {
  380. const iterRunID = data.execution_metadata?.parallel_mode_run_id
  381. const currIteration = iterParallelLogMap.get(iterations.node_id)?.get(iterRunID)
  382. const nodeIndex = currIteration?.findIndex(node =>
  383. node.node_id === data.node_id && (
  384. node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id),
  385. )
  386. if (currIteration) {
  387. if (nodeIndex !== undefined && nodeIndex !== -1) {
  388. currIteration[nodeIndex] = {
  389. ...currIteration[nodeIndex],
  390. ...data,
  391. } as any
  392. }
  393. else {
  394. currIteration.push({
  395. ...data,
  396. } as any)
  397. }
  398. }
  399. setIterParallelLogMap(iterParallelLogMap)
  400. const iterLogMap = iterParallelLogMap.get(iterations.node_id)
  401. if (iterLogMap)
  402. iterations.details = Array.from(iterLogMap.values())
  403. }
  404. }))
  405. }
  406. }
  407. else {
  408. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  409. const currentIndex = draft.tracing!.findIndex((trace) => {
  410. if (!trace.execution_metadata?.parallel_id)
  411. return trace.node_id === data.node_id
  412. return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id
  413. })
  414. if (currentIndex > -1 && draft.tracing) {
  415. draft.tracing[currentIndex] = {
  416. ...data,
  417. ...(draft.tracing[currentIndex].extras
  418. ? { extras: draft.tracing[currentIndex].extras }
  419. : {}),
  420. ...(draft.tracing[currentIndex].retryDetail
  421. ? { retryDetail: draft.tracing[currentIndex].retryDetail }
  422. : {}),
  423. } as any
  424. }
  425. }))
  426. const newNodes = produce(nodes, (draft) => {
  427. const currentNode = draft.find(node => node.id === data.node_id)!
  428. currentNode.data._runningStatus = data.status as any
  429. if (data.status === NodeRunningStatus.Exception) {
  430. if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch)
  431. currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch
  432. }
  433. else {
  434. if (data.node_type === BlockEnum.IfElse)
  435. currentNode.data._runningBranchId = data?.outputs?.selected_case_id
  436. if (data.node_type === BlockEnum.QuestionClassifier)
  437. currentNode.data._runningBranchId = data?.outputs?.class_id
  438. }
  439. })
  440. setNodes(newNodes)
  441. const newEdges = produce(edges, (draft) => {
  442. const incomeEdges = draft.filter((edge) => {
  443. return edge.target === data.node_id
  444. })
  445. incomeEdges.forEach((edge) => {
  446. edge.data = {
  447. ...edge.data,
  448. _targetRunningStatus: data.status as any,
  449. }
  450. })
  451. })
  452. setEdges(newEdges)
  453. prevNodeId = data.node_id
  454. }
  455. if (onNodeFinished)
  456. onNodeFinished(params)
  457. },
  458. onIterationStart: (params) => {
  459. const { data } = params
  460. const {
  461. workflowRunningData,
  462. setWorkflowRunningData,
  463. setIterTimes,
  464. } = workflowStore.getState()
  465. const {
  466. getNodes,
  467. setNodes,
  468. edges,
  469. setEdges,
  470. transform,
  471. } = store.getState()
  472. const nodes = getNodes()
  473. setIterTimes(DEFAULT_ITER_TIMES)
  474. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  475. draft.tracing!.push({
  476. ...data,
  477. status: NodeRunningStatus.Running,
  478. details: [],
  479. iterDurationMap: {},
  480. } as any)
  481. }))
  482. const {
  483. setViewport,
  484. } = reactflow
  485. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  486. const currentNode = nodes[currentNodeIndex]
  487. const position = currentNode.position
  488. const zoom = transform[2]
  489. if (!currentNode.parentId) {
  490. setViewport({
  491. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  492. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  493. zoom: transform[2],
  494. })
  495. }
  496. const newNodes = produce(nodes, (draft) => {
  497. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  498. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  499. draft[currentNodeIndex].data._waitingRun = false
  500. })
  501. setNodes(newNodes)
  502. const newEdges = produce(edges, (draft) => {
  503. const incomeEdges = draft.filter(edge => edge.target === data.node_id)
  504. incomeEdges.forEach((edge) => {
  505. edge.data = {
  506. ...edge.data,
  507. _sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus,
  508. _targetRunningStatus: NodeRunningStatus.Running,
  509. _waitingRun: false,
  510. }
  511. })
  512. })
  513. setEdges(newEdges)
  514. if (onIterationStart)
  515. onIterationStart(params)
  516. },
  517. onIterationNext: (params) => {
  518. const {
  519. workflowRunningData,
  520. setWorkflowRunningData,
  521. iterTimes,
  522. setIterTimes,
  523. } = workflowStore.getState()
  524. const { data } = params
  525. const {
  526. getNodes,
  527. setNodes,
  528. } = store.getState()
  529. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  530. const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id)
  531. if (iteration) {
  532. if (iteration.iterDurationMap && data.duration)
  533. iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration
  534. if (iteration.details!.length >= iteration.metadata.iterator_length!)
  535. return
  536. }
  537. if (!data.parallel_mode_run_id)
  538. iteration?.details!.push([])
  539. }))
  540. const nodes = getNodes()
  541. const newNodes = produce(nodes, (draft) => {
  542. const currentNode = draft.find(node => node.id === data.node_id)!
  543. currentNode.data._iterationIndex = iterTimes
  544. setIterTimes(iterTimes + 1)
  545. })
  546. setNodes(newNodes)
  547. if (onIterationNext)
  548. onIterationNext(params)
  549. },
  550. onIterationFinish: (params) => {
  551. const { data } = params
  552. const {
  553. workflowRunningData,
  554. setWorkflowRunningData,
  555. setIterTimes,
  556. } = workflowStore.getState()
  557. const {
  558. getNodes,
  559. setNodes,
  560. } = store.getState()
  561. const nodes = getNodes()
  562. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  563. const tracing = draft.tracing!
  564. const currIterationNode = tracing.find(trace => trace.node_id === data.node_id)
  565. if (currIterationNode) {
  566. Object.assign(currIterationNode, {
  567. ...data,
  568. status: NodeRunningStatus.Succeeded,
  569. })
  570. }
  571. }))
  572. setIterTimes(DEFAULT_ITER_TIMES)
  573. const newNodes = produce(nodes, (draft) => {
  574. const currentNode = draft.find(node => node.id === data.node_id)!
  575. currentNode.data._runningStatus = data.status
  576. })
  577. setNodes(newNodes)
  578. prevNodeId = data.node_id
  579. if (onIterationFinish)
  580. onIterationFinish(params)
  581. },
  582. onNodeRetry: (params) => {
  583. const { data } = params
  584. const {
  585. workflowRunningData,
  586. setWorkflowRunningData,
  587. iterParallelLogMap,
  588. setIterParallelLogMap,
  589. } = workflowStore.getState()
  590. const {
  591. getNodes,
  592. setNodes,
  593. } = store.getState()
  594. const nodes = getNodes()
  595. const currentNode = nodes.find(node => node.id === data.node_id)!
  596. const nodeParent = nodes.find(node => node.id === currentNode.parentId)
  597. if (nodeParent) {
  598. if (!data.execution_metadata.parallel_mode_run_id) {
  599. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  600. const tracing = draft.tracing!
  601. const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
  602. if (iteration && iteration.details?.length) {
  603. const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id)
  604. if (currentNodeRetry) {
  605. if (currentNodeRetry?.retryDetail)
  606. currentNodeRetry?.retryDetail.push(data as NodeTracing)
  607. else
  608. currentNodeRetry.retryDetail = [data as NodeTracing]
  609. }
  610. }
  611. }))
  612. }
  613. else {
  614. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  615. const tracing = draft.tracing!
  616. const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
  617. if (iteration && iteration.details?.length) {
  618. const iterRunID = data.execution_metadata?.parallel_mode_run_id
  619. const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID)
  620. const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id)
  621. if (currentNodeRetry) {
  622. if (currentNodeRetry?.retryDetail)
  623. currentNodeRetry?.retryDetail.push(data as NodeTracing)
  624. else
  625. currentNodeRetry.retryDetail = [data as NodeTracing]
  626. }
  627. setIterParallelLogMap(iterParallelLogMap)
  628. const iterLogMap = iterParallelLogMap.get(iteration.node_id)
  629. if (iterLogMap)
  630. iteration.details = Array.from(iterLogMap.values())
  631. }
  632. }))
  633. }
  634. }
  635. else {
  636. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  637. const tracing = draft.tracing!
  638. const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id)
  639. if (currentRetryNodeIndex > -1) {
  640. const currentRetryNode = tracing[currentRetryNodeIndex]
  641. if (currentRetryNode.retryDetail)
  642. draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing)
  643. else
  644. draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing]
  645. }
  646. }))
  647. }
  648. const newNodes = produce(nodes, (draft) => {
  649. const currentNode = draft.find(node => node.id === data.node_id)!
  650. currentNode.data._retryIndex = data.retry_index
  651. })
  652. setNodes(newNodes)
  653. if (onNodeRetry)
  654. onNodeRetry(params)
  655. },
  656. onParallelBranchStarted: (params) => {
  657. // console.log(params, 'parallel start')
  658. },
  659. onParallelBranchFinished: (params) => {
  660. // console.log(params, 'finished')
  661. },
  662. onTextChunk: (params) => {
  663. const { data: { text } } = params
  664. const {
  665. workflowRunningData,
  666. setWorkflowRunningData,
  667. } = workflowStore.getState()
  668. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  669. draft.resultTabActive = true
  670. draft.resultText += text
  671. }))
  672. },
  673. onTextReplace: (params) => {
  674. const { data: { text } } = params
  675. const {
  676. workflowRunningData,
  677. setWorkflowRunningData,
  678. } = workflowStore.getState()
  679. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  680. draft.resultText = text
  681. }))
  682. },
  683. onTTSChunk: (messageId: string, audio: string, audioType?: string) => {
  684. if (!audio || audio === '')
  685. return
  686. player.playAudioWithAudio(audio, true)
  687. AudioPlayerManager.getInstance().resetMsgId(messageId)
  688. },
  689. onTTSEnd: (messageId: string, audio: string, audioType?: string) => {
  690. player.playAudioWithAudio(audio, false)
  691. },
  692. ...restCallback,
  693. },
  694. )
  695. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  696. const handleStopRun = useCallback((taskId: string) => {
  697. const appId = useAppStore.getState().appDetail?.id
  698. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  699. }, [])
  700. const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => {
  701. const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } }))
  702. const edges = publishedWorkflow.graph.edges
  703. const viewport = publishedWorkflow.graph.viewport!
  704. handleUpdateWorkflowCanvas({
  705. nodes,
  706. edges,
  707. viewport,
  708. })
  709. const mappedFeatures = {
  710. opening: {
  711. enabled: !!publishedWorkflow.features.opening_statement || !!publishedWorkflow.features.suggested_questions.length,
  712. opening_statement: publishedWorkflow.features.opening_statement,
  713. suggested_questions: publishedWorkflow.features.suggested_questions,
  714. },
  715. suggested: publishedWorkflow.features.suggested_questions_after_answer,
  716. text2speech: publishedWorkflow.features.text_to_speech,
  717. speech2text: publishedWorkflow.features.speech_to_text,
  718. citation: publishedWorkflow.features.retriever_resource,
  719. moderation: publishedWorkflow.features.sensitive_word_avoidance,
  720. file: publishedWorkflow.features.file_upload,
  721. }
  722. featuresStore?.setState({ features: mappedFeatures })
  723. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  724. workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || [])
  725. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  726. return {
  727. handleBackupDraft,
  728. handleLoadBackupDraft,
  729. handleRun,
  730. handleStopRun,
  731. handleRestoreFromPublishedWorkflow,
  732. }
  733. }