use-workflow-run.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { useWorkflowStore } from '../store'
  8. import { useNodesSyncDraft } from '../hooks'
  9. import {
  10. NodeRunningStatus,
  11. WorkflowRunningStatus,
  12. } from '../types'
  13. import { useWorkflowUpdate } from './use-workflow-interactions'
  14. import { useStore as useAppStore } from '@/app/components/app/store'
  15. import type { IOtherOptions } from '@/service/base'
  16. import { ssePost } from '@/service/base'
  17. import {
  18. fetchPublishedWorkflow,
  19. stopWorkflowRun,
  20. } from '@/service/workflow'
  21. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  22. export const useWorkflowRun = () => {
  23. const store = useStoreApi()
  24. const workflowStore = useWorkflowStore()
  25. const reactflow = useReactFlow()
  26. const featuresStore = useFeaturesStore()
  27. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  28. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  29. const handleBackupDraft = useCallback(() => {
  30. const {
  31. getNodes,
  32. edges,
  33. } = store.getState()
  34. const { getViewport } = reactflow
  35. const {
  36. backupDraft,
  37. setBackupDraft,
  38. } = workflowStore.getState()
  39. const { features } = featuresStore!.getState()
  40. if (!backupDraft) {
  41. setBackupDraft({
  42. nodes: getNodes(),
  43. edges,
  44. viewport: getViewport(),
  45. features,
  46. })
  47. doSyncWorkflowDraft()
  48. }
  49. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  50. const handleLoadBackupDraft = useCallback(() => {
  51. const {
  52. backupDraft,
  53. setBackupDraft,
  54. } = workflowStore.getState()
  55. if (backupDraft) {
  56. const {
  57. nodes,
  58. edges,
  59. viewport,
  60. features,
  61. } = backupDraft
  62. handleUpdateWorkflowCanvas({
  63. nodes,
  64. edges,
  65. viewport,
  66. })
  67. featuresStore!.setState({ features })
  68. setBackupDraft(undefined)
  69. }
  70. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  71. const handleRun = useCallback(async (
  72. params: any,
  73. callback?: IOtherOptions,
  74. ) => {
  75. const {
  76. getNodes,
  77. setNodes,
  78. } = store.getState()
  79. const newNodes = produce(getNodes(), (draft) => {
  80. draft.forEach((node) => {
  81. node.data.selected = false
  82. node.data._runningStatus = undefined
  83. })
  84. })
  85. setNodes(newNodes)
  86. await doSyncWorkflowDraft()
  87. const {
  88. onWorkflowStarted,
  89. onWorkflowFinished,
  90. onNodeStarted,
  91. onNodeFinished,
  92. onIterationStart,
  93. onIterationNext,
  94. onIterationFinish,
  95. onError,
  96. ...restCallback
  97. } = callback || {}
  98. workflowStore.setState({ historyWorkflowData: undefined })
  99. const appDetail = useAppStore.getState().appDetail
  100. const workflowContainer = document.getElementById('workflow-container')
  101. const {
  102. clientWidth,
  103. clientHeight,
  104. } = workflowContainer!
  105. let url = ''
  106. if (appDetail?.mode === 'advanced-chat')
  107. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  108. if (appDetail?.mode === 'workflow')
  109. url = `/apps/${appDetail.id}/workflows/draft/run`
  110. let prevNodeId = ''
  111. const {
  112. setWorkflowRunningData,
  113. } = workflowStore.getState()
  114. setWorkflowRunningData({
  115. result: {
  116. status: WorkflowRunningStatus.Running,
  117. },
  118. tracing: [],
  119. resultText: '',
  120. })
  121. let isInIteration = false
  122. let iterationLength = 0
  123. ssePost(
  124. url,
  125. {
  126. body: params,
  127. },
  128. {
  129. onWorkflowStarted: (params) => {
  130. const { task_id, data } = params
  131. const {
  132. workflowRunningData,
  133. setWorkflowRunningData,
  134. } = workflowStore.getState()
  135. const {
  136. edges,
  137. setEdges,
  138. } = store.getState()
  139. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  140. draft.task_id = task_id
  141. draft.result = {
  142. ...draft?.result,
  143. ...data,
  144. status: WorkflowRunningStatus.Running,
  145. }
  146. }))
  147. const newEdges = produce(edges, (draft) => {
  148. draft.forEach((edge) => {
  149. edge.data = {
  150. ...edge.data,
  151. _runned: false,
  152. }
  153. })
  154. })
  155. setEdges(newEdges)
  156. if (onWorkflowStarted)
  157. onWorkflowStarted(params)
  158. },
  159. onWorkflowFinished: (params) => {
  160. const { data } = params
  161. const {
  162. workflowRunningData,
  163. setWorkflowRunningData,
  164. } = workflowStore.getState()
  165. const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
  166. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  167. draft.result = {
  168. ...draft.result,
  169. ...data,
  170. } as any
  171. if (isStringOutput) {
  172. draft.resultTabActive = true
  173. draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
  174. }
  175. }))
  176. prevNodeId = ''
  177. if (onWorkflowFinished)
  178. onWorkflowFinished(params)
  179. },
  180. onError: (params) => {
  181. const {
  182. workflowRunningData,
  183. setWorkflowRunningData,
  184. } = workflowStore.getState()
  185. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  186. draft.result = {
  187. ...draft.result,
  188. status: WorkflowRunningStatus.Failed,
  189. }
  190. }))
  191. if (onError)
  192. onError(params)
  193. },
  194. onNodeStarted: (params) => {
  195. const { data } = params
  196. const {
  197. workflowRunningData,
  198. setWorkflowRunningData,
  199. } = workflowStore.getState()
  200. const {
  201. getNodes,
  202. setNodes,
  203. edges,
  204. setEdges,
  205. transform,
  206. } = store.getState()
  207. if (isInIteration) {
  208. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  209. const tracing = draft.tracing!
  210. const iterations = tracing[tracing.length - 1]
  211. const currIteration = iterations.details![iterations.details!.length - 1]
  212. currIteration.push({
  213. ...data,
  214. status: NodeRunningStatus.Running,
  215. } as any)
  216. }))
  217. }
  218. else {
  219. const nodes = getNodes()
  220. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  221. draft.tracing!.push({
  222. ...data,
  223. status: NodeRunningStatus.Running,
  224. } as any)
  225. }))
  226. const {
  227. setViewport,
  228. } = reactflow
  229. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  230. const currentNode = nodes[currentNodeIndex]
  231. const position = currentNode.position
  232. const zoom = transform[2]
  233. if (!currentNode.parentId) {
  234. setViewport({
  235. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  236. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  237. zoom: transform[2],
  238. })
  239. }
  240. const newNodes = produce(nodes, (draft) => {
  241. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  242. })
  243. setNodes(newNodes)
  244. const newEdges = produce(edges, (draft) => {
  245. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  246. if (edge)
  247. edge.data = { ...edge.data, _runned: true } as any
  248. })
  249. setEdges(newEdges)
  250. }
  251. if (onNodeStarted)
  252. onNodeStarted(params)
  253. },
  254. onNodeFinished: (params) => {
  255. const { data } = params
  256. const {
  257. workflowRunningData,
  258. setWorkflowRunningData,
  259. } = workflowStore.getState()
  260. const {
  261. getNodes,
  262. setNodes,
  263. } = store.getState()
  264. if (isInIteration) {
  265. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  266. const tracing = draft.tracing!
  267. const iterations = tracing[tracing.length - 1]
  268. const currIteration = iterations.details![iterations.details!.length - 1]
  269. const nodeInfo = currIteration[currIteration.length - 1]
  270. currIteration[currIteration.length - 1] = {
  271. ...nodeInfo,
  272. ...data,
  273. status: NodeRunningStatus.Succeeded,
  274. } as any
  275. }))
  276. }
  277. else {
  278. const nodes = getNodes()
  279. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  280. const currentIndex = draft.tracing!.findIndex(trace => trace.node_id === data.node_id)
  281. if (currentIndex > -1 && draft.tracing) {
  282. draft.tracing[currentIndex] = {
  283. ...(draft.tracing[currentIndex].extras
  284. ? { extras: draft.tracing[currentIndex].extras }
  285. : {}),
  286. ...data,
  287. } as any
  288. }
  289. }))
  290. const newNodes = produce(nodes, (draft) => {
  291. const currentNode = draft.find(node => node.id === data.node_id)!
  292. currentNode.data._runningStatus = data.status as any
  293. })
  294. setNodes(newNodes)
  295. prevNodeId = data.node_id
  296. }
  297. if (onNodeFinished)
  298. onNodeFinished(params)
  299. },
  300. onIterationStart: (params) => {
  301. const { data } = params
  302. const {
  303. workflowRunningData,
  304. setWorkflowRunningData,
  305. } = workflowStore.getState()
  306. const {
  307. getNodes,
  308. setNodes,
  309. edges,
  310. setEdges,
  311. transform,
  312. } = store.getState()
  313. const nodes = getNodes()
  314. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  315. draft.tracing!.push({
  316. ...data,
  317. status: NodeRunningStatus.Running,
  318. details: [],
  319. } as any)
  320. }))
  321. isInIteration = true
  322. iterationLength = data.metadata.iterator_length
  323. const {
  324. setViewport,
  325. } = reactflow
  326. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  327. const currentNode = nodes[currentNodeIndex]
  328. const position = currentNode.position
  329. const zoom = transform[2]
  330. if (!currentNode.parentId) {
  331. setViewport({
  332. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  333. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  334. zoom: transform[2],
  335. })
  336. }
  337. const newNodes = produce(nodes, (draft) => {
  338. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  339. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  340. })
  341. setNodes(newNodes)
  342. const newEdges = produce(edges, (draft) => {
  343. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  344. if (edge)
  345. edge.data = { ...edge.data, _runned: true } as any
  346. })
  347. setEdges(newEdges)
  348. if (onIterationStart)
  349. onIterationStart(params)
  350. },
  351. onIterationNext: (params) => {
  352. const {
  353. workflowRunningData,
  354. setWorkflowRunningData,
  355. } = workflowStore.getState()
  356. const { data } = params
  357. const {
  358. getNodes,
  359. setNodes,
  360. } = store.getState()
  361. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  362. const iteration = draft.tracing![draft.tracing!.length - 1]
  363. if (iteration.details!.length >= iterationLength)
  364. return
  365. iteration.details!.push([])
  366. }))
  367. const nodes = getNodes()
  368. const newNodes = produce(nodes, (draft) => {
  369. const currentNode = draft.find(node => node.id === data.node_id)!
  370. currentNode.data._iterationIndex = data.index > 0 ? data.index : 1
  371. })
  372. setNodes(newNodes)
  373. if (onIterationNext)
  374. onIterationNext(params)
  375. },
  376. onIterationFinish: (params) => {
  377. const { data } = params
  378. const {
  379. workflowRunningData,
  380. setWorkflowRunningData,
  381. } = workflowStore.getState()
  382. const {
  383. getNodes,
  384. setNodes,
  385. } = store.getState()
  386. const nodes = getNodes()
  387. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  388. const tracing = draft.tracing!
  389. tracing[tracing.length - 1] = {
  390. ...tracing[tracing.length - 1],
  391. ...data,
  392. status: NodeRunningStatus.Succeeded,
  393. } as any
  394. }))
  395. isInIteration = false
  396. const newNodes = produce(nodes, (draft) => {
  397. const currentNode = draft.find(node => node.id === data.node_id)!
  398. currentNode.data._runningStatus = data.status
  399. })
  400. setNodes(newNodes)
  401. prevNodeId = data.node_id
  402. if (onIterationFinish)
  403. onIterationFinish(params)
  404. },
  405. onTextChunk: (params) => {
  406. const { data: { text } } = params
  407. const {
  408. workflowRunningData,
  409. setWorkflowRunningData,
  410. } = workflowStore.getState()
  411. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  412. draft.resultTabActive = true
  413. draft.resultText += text
  414. }))
  415. },
  416. onTextReplace: (params) => {
  417. const { data: { text } } = params
  418. const {
  419. workflowRunningData,
  420. setWorkflowRunningData,
  421. } = workflowStore.getState()
  422. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  423. draft.resultText = text
  424. }))
  425. },
  426. ...restCallback,
  427. },
  428. )
  429. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  430. const handleStopRun = useCallback((taskId: string) => {
  431. const appId = useAppStore.getState().appDetail?.id
  432. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  433. }, [])
  434. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  435. const appDetail = useAppStore.getState().appDetail
  436. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  437. if (publishedWorkflow) {
  438. const nodes = publishedWorkflow.graph.nodes
  439. const edges = publishedWorkflow.graph.edges
  440. const viewport = publishedWorkflow.graph.viewport!
  441. handleUpdateWorkflowCanvas({
  442. nodes,
  443. edges,
  444. viewport,
  445. })
  446. featuresStore?.setState({ features: publishedWorkflow.features })
  447. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  448. }
  449. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  450. return {
  451. handleBackupDraft,
  452. handleLoadBackupDraft,
  453. handleRun,
  454. handleStopRun,
  455. handleRestoreFromPublishedWorkflow,
  456. }
  457. }