import { API_PREFIX, IS_CE_EDITION, PUBLIC_API_PREFIX } from '@/config'
import { refreshAccessTokenOrRelogin } from './refresh-token'
import Toast from '@/app/components/base/toast'
import type { AnnotationReply, MessageEnd, MessageReplace, ThoughtItem } from '@/app/components/base/chat/chat/type'
import type { VisionFile } from '@/types/app'
import type {
AgentLogResponse,
IterationFinishedResponse,
IterationNextResponse,
IterationStartedResponse,
LoopFinishedResponse,
LoopNextResponse,
LoopStartedResponse,
NodeFinishedResponse,
NodeStartedResponse,
ParallelBranchFinishedResponse,
ParallelBranchStartedResponse,
TextChunkResponse,
TextReplaceResponse,
WorkflowFinishedResponse,
WorkflowStartedResponse,
} from '@/types/workflow'
import { removeAccessToken } from '@/app/components/share/utils'
import type { FetchOptionType, ResponseError } from './fetch'
import { ContentType, base, baseOptions, getAccessToken } from './fetch'
import { asyncRunSafe } from '@/utils'
const TIME_OUT = 100000
export type IOnDataMoreInfo = {
conversationId?: string
taskId?: string
messageId: string
errorMessage?: string
errorCode?: string
}
export type IOnData = (message: string, isFirstMessage: boolean, moreInfo: IOnDataMoreInfo) => void
export type IOnThought = (though: ThoughtItem) => void
export type IOnFile = (file: VisionFile) => void
export type IOnMessageEnd = (messageEnd: MessageEnd) => void
export type IOnMessageReplace = (messageReplace: MessageReplace) => void
export type IOnAnnotationReply = (messageReplace: AnnotationReply) => void
export type IOnCompleted = (hasError?: boolean, errorMessage?: string) => void
export type IOnError = (msg: string, code?: string) => void
export type IOnWorkflowStarted = (workflowStarted: WorkflowStartedResponse) => void
export type IOnWorkflowFinished = (workflowFinished: WorkflowFinishedResponse) => void
export type IOnNodeStarted = (nodeStarted: NodeStartedResponse) => void
export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void
export type IOnIterationStarted = (workflowStarted: IterationStartedResponse) => void
export type IOnIterationNext = (workflowStarted: IterationNextResponse) => void
export type IOnNodeRetry = (nodeFinished: NodeFinishedResponse) => void
export type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => void
export type IOnParallelBranchStarted = (parallelBranchStarted: ParallelBranchStartedResponse) => void
export type IOnParallelBranchFinished = (parallelBranchFinished: ParallelBranchFinishedResponse) => void
export type IOnTextChunk = (textChunk: TextChunkResponse) => void
export type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void
export type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void
export type IOnTextReplace = (textReplace: TextReplaceResponse) => void
export type IOnLoopStarted = (workflowStarted: LoopStartedResponse) => void
export type IOnLoopNext = (workflowStarted: LoopNextResponse) => void
export type IOnLoopFinished = (workflowFinished: LoopFinishedResponse) => void
export type IOnAgentLog = (agentLog: AgentLogResponse) => void
export type IOtherOptions = {
isPublicAPI?: boolean
isMarketplaceAPI?: boolean
bodyStringify?: boolean
needAllResponseContent?: boolean
deleteContentType?: boolean
silent?: boolean
onData?: IOnData // for stream
onThought?: IOnThought
onFile?: IOnFile
onMessageEnd?: IOnMessageEnd
onMessageReplace?: IOnMessageReplace
onError?: IOnError
onCompleted?: IOnCompleted // for stream
getAbortController?: (abortController: AbortController) => void
onWorkflowStarted?: IOnWorkflowStarted
onWorkflowFinished?: IOnWorkflowFinished
onNodeStarted?: IOnNodeStarted
onNodeFinished?: IOnNodeFinished
onIterationStart?: IOnIterationStarted
onIterationNext?: IOnIterationNext
onIterationFinish?: IOnIterationFinished
onNodeRetry?: IOnNodeRetry
onParallelBranchStarted?: IOnParallelBranchStarted
onParallelBranchFinished?: IOnParallelBranchFinished
onTextChunk?: IOnTextChunk
onTTSChunk?: IOnTTSChunk
onTTSEnd?: IOnTTSEnd
onTextReplace?: IOnTextReplace
onLoopStart?: IOnLoopStarted
onLoopNext?: IOnLoopNext
onLoopFinish?: IOnLoopFinished
onAgentLog?: IOnAgentLog
}
function unicodeToChar(text: string) {
if (!text)
return ''
return text.replace(/\\u[0-9a-f]{4}/g, (_match, p1) => {
return String.fromCharCode(Number.parseInt(p1, 16))
})
}
function requiredWebSSOLogin() {
globalThis.location.href = `/webapp-signin?redirect_url=${globalThis.location.pathname}`
}
export function format(text: string) {
let res = text.trim()
if (res.startsWith('\n'))
res = res.replace('\n', '')
return res.replaceAll('\n', '
').replaceAll('```', '')
}
const handleStream = (
response: Response,
onData: IOnData,
onCompleted?: IOnCompleted,
onThought?: IOnThought,
onMessageEnd?: IOnMessageEnd,
onMessageReplace?: IOnMessageReplace,
onFile?: IOnFile,
onWorkflowStarted?: IOnWorkflowStarted,
onWorkflowFinished?: IOnWorkflowFinished,
onNodeStarted?: IOnNodeStarted,
onNodeFinished?: IOnNodeFinished,
onIterationStart?: IOnIterationStarted,
onIterationNext?: IOnIterationNext,
onIterationFinish?: IOnIterationFinished,
onLoopStart?: IOnLoopStarted,
onLoopNext?: IOnLoopNext,
onLoopFinish?: IOnLoopFinished,
onNodeRetry?: IOnNodeRetry,
onParallelBranchStarted?: IOnParallelBranchStarted,
onParallelBranchFinished?: IOnParallelBranchFinished,
onTextChunk?: IOnTextChunk,
onTTSChunk?: IOnTTSChunk,
onTTSEnd?: IOnTTSEnd,
onTextReplace?: IOnTextReplace,
onAgentLog?: IOnAgentLog,
) => {
if (!response.ok)
throw new Error('Network response was not ok')
const reader = response.body?.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
let bufferObj: Record
let isFirstMessage = true
function read() {
let hasError = false
reader?.read().then((result: any) => {
if (result.done) {
onCompleted && onCompleted()
return
}
buffer += decoder.decode(result.value, { stream: true })
const lines = buffer.split('\n')
try {
lines.forEach((message) => {
if (message.startsWith('data: ')) { // check if it starts with data:
try {
bufferObj = JSON.parse(message.substring(6)) as Record// remove data: and parse as json
}
catch (e) {
// mute handle message cut off
onData('', isFirstMessage, {
conversationId: bufferObj?.conversation_id,
messageId: bufferObj?.message_id,
})
return
}
if (bufferObj.status === 400 || !bufferObj.event) {
onData('', false, {
conversationId: undefined,
messageId: '',
errorMessage: bufferObj?.message,
errorCode: bufferObj?.code,
})
hasError = true
onCompleted?.(true, bufferObj?.message)
return
}
if (bufferObj.event === 'message' || bufferObj.event === 'agent_message') {
// can not use format here. Because message is splitted.
onData(unicodeToChar(bufferObj.answer), isFirstMessage, {
conversationId: bufferObj.conversation_id,
taskId: bufferObj.task_id,
messageId: bufferObj.id,
})
isFirstMessage = false
}
else if (bufferObj.event === 'agent_thought') {
onThought?.(bufferObj as ThoughtItem)
}
else if (bufferObj.event === 'message_file') {
onFile?.(bufferObj as VisionFile)
}
else if (bufferObj.event === 'message_end') {
onMessageEnd?.(bufferObj as MessageEnd)
}
else if (bufferObj.event === 'message_replace') {
onMessageReplace?.(bufferObj as MessageReplace)
}
else if (bufferObj.event === 'workflow_started') {
onWorkflowStarted?.(bufferObj as WorkflowStartedResponse)
}
else if (bufferObj.event === 'workflow_finished') {
onWorkflowFinished?.(bufferObj as WorkflowFinishedResponse)
}
else if (bufferObj.event === 'node_started') {
onNodeStarted?.(bufferObj as NodeStartedResponse)
}
else if (bufferObj.event === 'node_finished') {
onNodeFinished?.(bufferObj as NodeFinishedResponse)
}
else if (bufferObj.event === 'iteration_started') {
onIterationStart?.(bufferObj as IterationStartedResponse)
}
else if (bufferObj.event === 'iteration_next') {
onIterationNext?.(bufferObj as IterationNextResponse)
}
else if (bufferObj.event === 'iteration_completed') {
onIterationFinish?.(bufferObj as IterationFinishedResponse)
}
else if (bufferObj.event === 'loop_started') {
onLoopStart?.(bufferObj as LoopStartedResponse)
}
else if (bufferObj.event === 'loop_next') {
onLoopNext?.(bufferObj as LoopNextResponse)
}
else if (bufferObj.event === 'loop_completed') {
onLoopFinish?.(bufferObj as LoopFinishedResponse)
}
else if (bufferObj.event === 'node_retry') {
onNodeRetry?.(bufferObj as NodeFinishedResponse)
}
else if (bufferObj.event === 'parallel_branch_started') {
onParallelBranchStarted?.(bufferObj as ParallelBranchStartedResponse)
}
else if (bufferObj.event === 'parallel_branch_finished') {
onParallelBranchFinished?.(bufferObj as ParallelBranchFinishedResponse)
}
else if (bufferObj.event === 'text_chunk') {
onTextChunk?.(bufferObj as TextChunkResponse)
}
else if (bufferObj.event === 'text_replace') {
onTextReplace?.(bufferObj as TextReplaceResponse)
}
else if (bufferObj.event === 'agent_log') {
onAgentLog?.(bufferObj as AgentLogResponse)
}
else if (bufferObj.event === 'tts_message') {
onTTSChunk?.(bufferObj.message_id, bufferObj.audio, bufferObj.audio_type)
}
else if (bufferObj.event === 'tts_message_end') {
onTTSEnd?.(bufferObj.message_id, bufferObj.audio)
}
}
})
buffer = lines[lines.length - 1]
}
catch (e) {
onData('', false, {
conversationId: undefined,
messageId: '',
errorMessage: `${e}`,
})
hasError = true
onCompleted?.(true, e as string)
return
}
if (!hasError)
read()
})
}
read()
}
const baseFetch = base
export const upload = (options: any, isPublicAPI?: boolean, url?: string, searchParams?: string): Promise => {
const urlPrefix = isPublicAPI ? PUBLIC_API_PREFIX : API_PREFIX
const token = getAccessToken(isPublicAPI)
const defaultOptions = {
method: 'POST',
url: (url ? `${urlPrefix}${url}` : `${urlPrefix}/files/upload`) + (searchParams || ''),
headers: {
Authorization: `Bearer ${token}`,
},
data: {},
}
options = {
...defaultOptions,
...options,
headers: { ...defaultOptions.headers, ...options.headers },
}
return new Promise((resolve, reject) => {
const xhr = options.xhr
xhr.open(options.method, options.url)
for (const key in options.headers)
xhr.setRequestHeader(key, options.headers[key])
xhr.withCredentials = true
xhr.responseType = 'json'
xhr.onreadystatechange = function () {
if (xhr.readyState === 4) {
if (xhr.status === 201)
resolve(xhr.response)
else
reject(xhr)
}
}
xhr.upload.onprogress = options.onprogress
xhr.send(options.data)
})
}
export const ssePost = (
url: string,
fetchOptions: FetchOptionType,
otherOptions: IOtherOptions,
) => {
const {
isPublicAPI = false,
onData,
onCompleted,
onThought,
onFile,
onMessageEnd,
onMessageReplace,
onWorkflowStarted,
onWorkflowFinished,
onNodeStarted,
onNodeFinished,
onIterationStart,
onIterationNext,
onIterationFinish,
onNodeRetry,
onParallelBranchStarted,
onParallelBranchFinished,
onTextChunk,
onTTSChunk,
onTTSEnd,
onTextReplace,
onAgentLog,
onError,
getAbortController,
onLoopStart,
onLoopNext,
onLoopFinish,
} = otherOptions
const abortController = new AbortController()
const token = localStorage.getItem('console_token')
const options = Object.assign({}, baseOptions, {
method: 'POST',
signal: abortController.signal,
headers: new Headers({
Authorization: `Bearer ${token}`,
}),
} as RequestInit, fetchOptions)
const contentType = (options.headers as Headers).get('Content-Type')
if (!contentType)
(options.headers as Headers).set('Content-Type', ContentType.json)
getAbortController?.(abortController)
const urlPrefix = isPublicAPI ? PUBLIC_API_PREFIX : API_PREFIX
const urlWithPrefix = (url.startsWith('http://') || url.startsWith('https://'))
? url
: `${urlPrefix}${url.startsWith('/') ? url : `/${url}`}`
const { body } = options
if (body)
options.body = JSON.stringify(body)
const accessToken = getAccessToken(isPublicAPI)
;(options.headers as Headers).set('Authorization', `Bearer ${accessToken}`)
globalThis.fetch(urlWithPrefix, options as RequestInit)
.then((res) => {
if (!/^(2|3)\d{2}$/.test(String(res.status))) {
if (res.status === 401) {
refreshAccessTokenOrRelogin(TIME_OUT).then(() => {
ssePost(url, fetchOptions, otherOptions)
}).catch(() => {
res.json().then((data: any) => {
if (isPublicAPI) {
if (data.code === 'web_sso_auth_required')
requiredWebSSOLogin()
if (data.code === 'unauthorized') {
removeAccessToken()
globalThis.location.reload()
}
}
})
})
}
else {
res.json().then((data) => {
Toast.notify({ type: 'error', message: data.message || 'Server Error' })
})
onError?.('Server Error')
}
return
}
return handleStream(res, (str: string, isFirstMessage: boolean, moreInfo: IOnDataMoreInfo) => {
if (moreInfo.errorMessage) {
onError?.(moreInfo.errorMessage, moreInfo.errorCode)
// TypeError: Cannot assign to read only property ... will happen in page leave, so it should be ignored.
if (moreInfo.errorMessage !== 'AbortError: The user aborted a request.' && !moreInfo.errorMessage.includes('TypeError: Cannot assign to read only property'))
Toast.notify({ type: 'error', message: moreInfo.errorMessage })
return
}
onData?.(str, isFirstMessage, moreInfo)
},
onCompleted,
onThought,
onMessageEnd,
onMessageReplace,
onFile,
onWorkflowStarted,
onWorkflowFinished,
onNodeStarted,
onNodeFinished,
onIterationStart,
onIterationNext,
onIterationFinish,
onLoopStart,
onLoopNext,
onLoopFinish,
onNodeRetry,
onParallelBranchStarted,
onParallelBranchFinished,
onTextChunk,
onTTSChunk,
onTTSEnd,
onTextReplace,
onAgentLog,
)
}).catch((e) => {
if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property'))
Toast.notify({ type: 'error', message: e })
onError?.(e)
})
}
// base request
export const request = async(url: string, options = {}, otherOptions?: IOtherOptions) => {
try {
const otherOptionsForBaseFetch = otherOptions || {}
const [err, resp] = await asyncRunSafe(baseFetch(url, options, otherOptionsForBaseFetch))
if (err === null)
return resp
const errResp: Response = err as any
if (errResp.status === 401) {
const [parseErr, errRespData] = await asyncRunSafe(errResp.json())
const loginUrl = `${globalThis.location.origin}/signin`
if (parseErr) {
globalThis.location.href = loginUrl
return Promise.reject(err)
}
// special code
const { code, message } = errRespData
// webapp sso
if (code === 'web_sso_auth_required') {
requiredWebSSOLogin()
return Promise.reject(err)
}
if (code === 'unauthorized_and_force_logout') {
localStorage.removeItem('console_token')
localStorage.removeItem('refresh_token')
globalThis.location.reload()
return Promise.reject(err)
}
const {
isPublicAPI = false,
silent,
} = otherOptionsForBaseFetch
if (isPublicAPI && code === 'unauthorized') {
removeAccessToken()
globalThis.location.reload()
return Promise.reject(err)
}
if (code === 'init_validate_failed' && IS_CE_EDITION && !silent) {
Toast.notify({ type: 'error', message, duration: 4000 })
return Promise.reject(err)
}
if (code === 'not_init_validated' && IS_CE_EDITION) {
globalThis.location.href = `${globalThis.location.origin}/init`
return Promise.reject(err)
}
if (code === 'not_setup' && IS_CE_EDITION) {
globalThis.location.href = `${globalThis.location.origin}/install`
return Promise.reject(err)
}
// refresh token
const [refreshErr] = await asyncRunSafe(refreshAccessTokenOrRelogin(TIME_OUT))
if (refreshErr === null)
return baseFetch(url, options, otherOptionsForBaseFetch)
if (location.pathname !== '/signin' || !IS_CE_EDITION) {
globalThis.location.href = loginUrl
return Promise.reject(err)
}
if (!silent) {
Toast.notify({ type: 'error', message })
return Promise.reject(err)
}
globalThis.location.href = loginUrl
return Promise.reject(err)
}
else {
return Promise.reject(err)
}
}
catch (error) {
console.error(error)
return Promise.reject(error)
}
}
// request methods
export const get = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return request(url, Object.assign({}, options, { method: 'GET' }), otherOptions)
}
// For public API
export const getPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return get(url, options, { ...otherOptions, isPublicAPI: true })
}
// For Marketplace API
export const getMarketplace = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return get(url, options, { ...otherOptions, isMarketplaceAPI: true })
}
export const post = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return request(url, Object.assign({}, options, { method: 'POST' }), otherOptions)
}
// For Marketplace API
export const postMarketplace = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return post(url, options, { ...otherOptions, isMarketplaceAPI: true })
}
export const postPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return post(url, options, { ...otherOptions, isPublicAPI: true })
}
export const put = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return request(url, Object.assign({}, options, { method: 'PUT' }), otherOptions)
}
export const putPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return put(url, options, { ...otherOptions, isPublicAPI: true })
}
export const del = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return request(url, Object.assign({}, options, { method: 'DELETE' }), otherOptions)
}
export const delPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return del(url, options, { ...otherOptions, isPublicAPI: true })
}
export const patch = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return request(url, Object.assign({}, options, { method: 'PATCH' }), otherOptions)
}
export const patchPublic = (url: string, options = {}, otherOptions?: IOtherOptions) => {
return patch(url, options, { ...otherOptions, isPublicAPI: true })
}