import { API_PREFIX, IS_CE_EDITION, PUBLIC_API_PREFIX } from '@/config' import { refreshAccessTokenOrRelogin } from './refresh-token' import Toast from '@/app/components/base/toast' import type { AnnotationReply, MessageEnd, MessageReplace, ThoughtItem } from '@/app/components/base/chat/chat/type' import type { VisionFile } from '@/types/app' import type { AgentLogResponse, IterationFinishedResponse, IterationNextResponse, IterationStartedResponse, LoopFinishedResponse, LoopNextResponse, LoopStartedResponse, NodeFinishedResponse, NodeStartedResponse, ParallelBranchFinishedResponse, ParallelBranchStartedResponse, TextChunkResponse, TextReplaceResponse, WorkflowFinishedResponse, WorkflowStartedResponse, } from '@/types/workflow' import { removeAccessToken } from '@/app/components/share/utils' import type { FetchOptionType, ResponseError } from './fetch' import { ContentType, base, baseOptions, getAccessToken } from './fetch' import { asyncRunSafe } from '@/utils' const TIME_OUT = 100000 export type IOnDataMoreInfo = { conversationId?: string taskId?: string messageId: string errorMessage?: string errorCode?: string } export type IOnData = (message: string, isFirstMessage: boolean, moreInfo: IOnDataMoreInfo) => void export type IOnThought = (though: ThoughtItem) => void export type IOnFile = (file: VisionFile) => void export type IOnMessageEnd = (messageEnd: MessageEnd) => void export type IOnMessageReplace = (messageReplace: MessageReplace) => void export type IOnAnnotationReply = (messageReplace: AnnotationReply) => void export type IOnCompleted = (hasError?: boolean, errorMessage?: string) => void export type IOnError = (msg: string, code?: string) => void export type IOnWorkflowStarted = (workflowStarted: WorkflowStartedResponse) => void export type IOnWorkflowFinished = (workflowFinished: WorkflowFinishedResponse) => void export type IOnNodeStarted = (nodeStarted: NodeStartedResponse) => void export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void export type IOnIterationStarted = (workflowStarted: IterationStartedResponse) => void export type IOnIterationNext = (workflowStarted: IterationNextResponse) => void export type IOnNodeRetry = (nodeFinished: NodeFinishedResponse) => void export type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => void export type IOnParallelBranchStarted = (parallelBranchStarted: ParallelBranchStartedResponse) => void export type IOnParallelBranchFinished = (parallelBranchFinished: ParallelBranchFinishedResponse) => void export type IOnTextChunk = (textChunk: TextChunkResponse) => void export type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void export type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void export type IOnTextReplace = (textReplace: TextReplaceResponse) => void export type IOnLoopStarted = (workflowStarted: LoopStartedResponse) => void export type IOnLoopNext = (workflowStarted: LoopNextResponse) => void export type IOnLoopFinished = (workflowFinished: LoopFinishedResponse) => void export type IOnAgentLog = (agentLog: AgentLogResponse) => void export type IOtherOptions = { isPublicAPI?: boolean isMarketplaceAPI?: boolean bodyStringify?: boolean needAllResponseContent?: boolean deleteContentType?: boolean silent?: boolean onData?: IOnData // for stream onThought?: IOnThought onFile?: IOnFile onMessageEnd?: IOnMessageEnd onMessageReplace?: IOnMessageReplace onError?: IOnError onCompleted?: IOnCompleted // for stream getAbortController?: (abortController: AbortController) => void onWorkflowStarted?: IOnWorkflowStarted onWorkflowFinished?: IOnWorkflowFinished onNodeStarted?: IOnNodeStarted onNodeFinished?: IOnNodeFinished onIterationStart?: IOnIterationStarted onIterationNext?: IOnIterationNext onIterationFinish?: IOnIterationFinished onNodeRetry?: IOnNodeRetry onParallelBranchStarted?: IOnParallelBranchStarted onParallelBranchFinished?: IOnParallelBranchFinished onTextChunk?: IOnTextChunk onTTSChunk?: IOnTTSChunk onTTSEnd?: IOnTTSEnd onTextReplace?: IOnTextReplace onLoopStart?: IOnLoopStarted onLoopNext?: IOnLoopNext onLoopFinish?: IOnLoopFinished onAgentLog?: IOnAgentLog } function unicodeToChar(text: string) { if (!text) return '' return text.replace(/\\u[0-9a-f]{4}/g, (_match, p1) => { return String.fromCharCode(Number.parseInt(p1, 16)) }) } function requiredWebSSOLogin() { globalThis.location.href = `/webapp-signin?redirect_url=${globalThis.location.pathname}` } export function format(text: string) { let res = text.trim() if (res.startsWith('\n')) res = res.replace('\n', '') return res.replaceAll('\n', '
').replaceAll('```', '') } const handleStream = ( response: Response, onData: IOnData, onCompleted?: IOnCompleted, onThought?: IOnThought, onMessageEnd?: IOnMessageEnd, onMessageReplace?: IOnMessageReplace, onFile?: IOnFile, onWorkflowStarted?: IOnWorkflowStarted, onWorkflowFinished?: IOnWorkflowFinished, onNodeStarted?: IOnNodeStarted, onNodeFinished?: IOnNodeFinished, onIterationStart?: IOnIterationStarted, onIterationNext?: IOnIterationNext, onIterationFinish?: IOnIterationFinished, onLoopStart?: IOnLoopStarted, onLoopNext?: IOnLoopNext, onLoopFinish?: IOnLoopFinished, onNodeRetry?: IOnNodeRetry, onParallelBranchStarted?: IOnParallelBranchStarted, onParallelBranchFinished?: IOnParallelBranchFinished, onTextChunk?: IOnTextChunk, onTTSChunk?: IOnTTSChunk, onTTSEnd?: IOnTTSEnd, onTextReplace?: IOnTextReplace, onAgentLog?: IOnAgentLog, ) => { if (!response.ok) throw new Error('Network response was not ok') const reader = response.body?.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' let bufferObj: Record let isFirstMessage = true function read() { let hasError = false reader?.read().then((result: any) => { if (result.done) { onCompleted && onCompleted() return } buffer += decoder.decode(result.value, { stream: true }) const lines = buffer.split('\n') try { lines.forEach((message) => { if (message.startsWith('data: ')) { // check if it starts with data: try { bufferObj = JSON.parse(message.substring(6)) as Record// remove data: and parse as json } catch (e) { // mute handle message cut off onData('', isFirstMessage, { conversationId: bufferObj?.conversation_id, messageId: bufferObj?.message_id, }) return } if (bufferObj.status === 400 || !bufferObj.event) { onData('', false, { conversationId: undefined, messageId: '', errorMessage: bufferObj?.message, errorCode: bufferObj?.code, }) hasError = true onCompleted?.(true, bufferObj?.message) return } if (bufferObj.event === 'message' || bufferObj.event === 'agent_message') { // can not use format here. Because message is splitted. onData(unicodeToChar(bufferObj.answer), isFirstMessage, { conversationId: bufferObj.conversation_id, taskId: bufferObj.task_id, messageId: bufferObj.id, }) isFirstMessage = false } else if (bufferObj.event === 'agent_thought') { onThought?.(bufferObj as ThoughtItem) } else if (bufferObj.event === 'message_file') { onFile?.(bufferObj as VisionFile) } else if (bufferObj.event === 'message_end') { onMessageEnd?.(bufferObj as MessageEnd) } else if (bufferObj.event === 'message_replace') { onMessageReplace?.(bufferObj as MessageReplace) } else if (bufferObj.event === 'workflow_started') { onWorkflowStarted?.(bufferObj as WorkflowStartedResponse) } else if (bufferObj.event === 'workflow_finished') { onWorkflowFinished?.(bufferObj as WorkflowFinishedResponse) } else if (bufferObj.event === 'node_started') { onNodeStarted?.(bufferObj as NodeStartedResponse) } else if (bufferObj.event === 'node_finished') { onNodeFinished?.(bufferObj as NodeFinishedResponse) } else if (bufferObj.event === 'iteration_started') { onIterationStart?.(bufferObj as IterationStartedResponse) } else if (bufferObj.event === 'iteration_next') { onIterationNext?.(bufferObj as IterationNextResponse) } else if (bufferObj.event === 'iteration_completed') { onIterationFinish?.(bufferObj as IterationFinishedResponse) } else if (bufferObj.event === 'loop_started') { onLoopStart?.(bufferObj as LoopStartedResponse) } else if (bufferObj.event === 'loop_next') { onLoopNext?.(bufferObj as LoopNextResponse) } else if (bufferObj.event === 'loop_completed') { onLoopFinish?.(bufferObj as LoopFinishedResponse) } else if (bufferObj.event === 'node_retry') { onNodeRetry?.(bufferObj as NodeFinishedResponse) } else if (bufferObj.event === 'parallel_branch_started') { onParallelBranchStarted?.(bufferObj as ParallelBranchStartedResponse) } else if (bufferObj.event === 'parallel_branch_finished') { onParallelBranchFinished?.(bufferObj as ParallelBranchFinishedResponse) } else if (bufferObj.event === 'text_chunk') { onTextChunk?.(bufferObj as TextChunkResponse) } else if (bufferObj.event === 'text_replace') { onTextReplace?.(bufferObj as TextReplaceResponse) } else if (bufferObj.event === 'agent_log') { onAgentLog?.(bufferObj as AgentLogResponse) } else if (bufferObj.event === 'tts_message') { onTTSChunk?.(bufferObj.message_id, bufferObj.audio, bufferObj.audio_type) } else if (bufferObj.event === 'tts_message_end') { onTTSEnd?.(bufferObj.message_id, bufferObj.audio) } } }) buffer = lines[lines.length - 1] } catch (e) { onData('', false, { conversationId: undefined, messageId: '', errorMessage: `${e}`, }) hasError = true onCompleted?.(true, e as string) return } if (!hasError) read() }) } read() } const baseFetch = base export const upload = (options: any, isPublicAPI?: boolean, url?: string, searchParams?: string): Promise => { const urlPrefix = isPublicAPI ? PUBLIC_API_PREFIX : API_PREFIX const token = getAccessToken(isPublicAPI) const defaultOptions = { method: 'POST', url: (url ? `${urlPrefix}${url}` : `${urlPrefix}/files/upload`) + (searchParams || ''), headers: { Authorization: `Bearer ${token}`, }, data: {}, } options = { ...defaultOptions, ...options, headers: { ...defaultOptions.headers, ...options.headers }, } return new Promise((resolve, reject) => { const xhr = options.xhr xhr.open(options.method, options.url) for (const key in options.headers) xhr.setRequestHeader(key, options.headers[key]) xhr.withCredentials = true xhr.responseType = 'json' xhr.onreadystatechange = function () { if (xhr.readyState === 4) { if (xhr.status === 201) resolve(xhr.response) else reject(xhr) } } xhr.upload.onprogress = options.onprogress xhr.send(options.data) }) } export const ssePost = ( url: string, fetchOptions: FetchOptionType, otherOptions: IOtherOptions, ) => { const { isPublicAPI = false, onData, onCompleted, onThought, onFile, onMessageEnd, onMessageReplace, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onNodeRetry, onParallelBranchStarted, onParallelBranchFinished, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace, onAgentLog, onError, getAbortController, onLoopStart, onLoopNext, onLoopFinish, } = otherOptions const abortController = new AbortController() const token = localStorage.getItem('console_token') const options = Object.assign({}, baseOptions, { method: 'POST', signal: abortController.signal, headers: new Headers({ Authorization: `Bearer ${token}`, }), } as RequestInit, fetchOptions) const contentType = (options.headers as Headers).get('Content-Type') if (!contentType) (options.headers as Headers).set('Content-Type', ContentType.json) getAbortController?.(abortController) const urlPrefix = isPublicAPI ? PUBLIC_API_PREFIX : API_PREFIX const urlWithPrefix = (url.startsWith('http://') || url.startsWith('https://')) ? url : `${urlPrefix}${url.startsWith('/') ? url : `/${url}`}` const { body } = options if (body) options.body = JSON.stringify(body) const accessToken = getAccessToken(isPublicAPI) ;(options.headers as Headers).set('Authorization', `Bearer ${accessToken}`) globalThis.fetch(urlWithPrefix, options as RequestInit) .then((res) => { if (!/^(2|3)\d{2}$/.test(String(res.status))) { if (res.status === 401) { refreshAccessTokenOrRelogin(TIME_OUT).then(() => { ssePost(url, fetchOptions, otherOptions) }).catch(() => { res.json().then((data: any) => { if (isPublicAPI) { if (data.code === 'web_sso_auth_required') requiredWebSSOLogin() if (data.code === 'unauthorized') { removeAccessToken() globalThis.location.reload() } } }) }) } else { res.json().then((data) => { Toast.notify({ type: 'error', message: data.message || 'Server Error' }) }) onError?.('Server Error') } return } return handleStream(res, (str: string, isFirstMessage: boolean, moreInfo: IOnDataMoreInfo) => { if (moreInfo.errorMessage) { onError?.(moreInfo.errorMessage, moreInfo.errorCode) // TypeError: Cannot assign to read only property ... will happen in page leave, so it should be ignored. if (moreInfo.errorMessage !== 'AbortError: The user aborted a request.' && !moreInfo.errorMessage.includes('TypeError: Cannot assign to read only property')) Toast.notify({ type: 'error', message: moreInfo.errorMessage }) return } onData?.(str, isFirstMessage, moreInfo) }, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onLoopStart, onLoopNext, onLoopFinish, onNodeRetry, onParallelBranchStarted, onParallelBranchFinished, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace, onAgentLog, ) }).catch((e) => { if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property')) Toast.notify({ type: 'error', message: e }) onError?.(e) }) } // base request export const request = async(url: string, options = {}, otherOptions?: IOtherOptions) => { try { const otherOptionsForBaseFetch = otherOptions || {} const [err, resp] = await asyncRunSafe(baseFetch(url, options, otherOptionsForBaseFetch)) if (err === null) return resp const errResp: Response = err as any if (errResp.status === 401) { const [parseErr, errRespData] = await asyncRunSafe(errResp.json()) const loginUrl = `${globalThis.location.origin}/signin` if (parseErr) { globalThis.location.href = loginUrl return Promise.reject(err) } // special code const { code, message } = errRespData // webapp sso if (code === 'web_sso_auth_required') { requiredWebSSOLogin() return Promise.reject(err) } if (code === 'unauthorized_and_force_logout') { localStorage.removeItem('console_token') localStorage.removeItem('refresh_token') globalThis.location.reload() return Promise.reject(err) } const { isPublicAPI = false, silent, } = otherOptionsForBaseFetch if (isPublicAPI && code === 'unauthorized') { removeAccessToken() globalThis.location.reload() return Promise.reject(err) } if (code === 'init_validate_failed' && IS_CE_EDITION && !silent) { Toast.notify({ type: 'error', message, duration: 4000 }) return Promise.reject(err) } if (code === 'not_init_validated' && IS_CE_EDITION) { globalThis.location.href = `${globalThis.location.origin}/init` return Promise.reject(err) } if (code === 'not_setup' && IS_CE_EDITION) { globalThis.location.href = `${globalThis.location.origin}/install` return Promise.reject(err) } // refresh token const [refreshErr] = await asyncRunSafe(refreshAccessTokenOrRelogin(TIME_OUT)) if (refreshErr === null) return baseFetch(url, options, otherOptionsForBaseFetch) if (location.pathname !== '/signin' || !IS_CE_EDITION) { globalThis.location.href = loginUrl return Promise.reject(err) } if (!silent) { Toast.notify({ type: 'error', message }) return Promise.reject(err) } globalThis.location.href = loginUrl return Promise.reject(err) } else { return Promise.reject(err) } } catch (error) { console.error(error) return Promise.reject(error) } } // request methods export const get = (url: string, options = {}, otherOptions?: IOtherOptions) => { return request(url, Object.assign({}, options, { method: 'GET' }), otherOptions) } // For public API export const getPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => { return get(url, options, { ...otherOptions, isPublicAPI: true }) } // For Marketplace API export const getMarketplace = (url: string, options = {}, otherOptions?: IOtherOptions) => { return get(url, options, { ...otherOptions, isMarketplaceAPI: true }) } export const post = (url: string, options = {}, otherOptions?: IOtherOptions) => { return request(url, Object.assign({}, options, { method: 'POST' }), otherOptions) } // For Marketplace API export const postMarketplace = (url: string, options = {}, otherOptions?: IOtherOptions) => { return post(url, options, { ...otherOptions, isMarketplaceAPI: true }) } export const postPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => { return post(url, options, { ...otherOptions, isPublicAPI: true }) } export const put = (url: string, options = {}, otherOptions?: IOtherOptions) => { return request(url, Object.assign({}, options, { method: 'PUT' }), otherOptions) } export const putPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => { return put(url, options, { ...otherOptions, isPublicAPI: true }) } export const del = (url: string, options = {}, otherOptions?: IOtherOptions) => { return request(url, Object.assign({}, options, { method: 'DELETE' }), otherOptions) } export const delPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => { return del(url, options, { ...otherOptions, isPublicAPI: true }) } export const patch = (url: string, options = {}, otherOptions?: IOtherOptions) => { return request(url, Object.assign({}, options, { method: 'PATCH' }), otherOptions) } export const patchPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => { return patch(url, options, { ...otherOptions, isPublicAPI: true }) }