task.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. // returns error only if payload is not correct
  14. func InvokeDify(
  15. declaration *plugin_entities.PluginDeclaration,
  16. invoke_from access_types.PluginAccessType,
  17. session *session_manager.Session,
  18. writer BackwardsInvocationWriter,
  19. data []byte,
  20. ) error {
  21. // unmarshal invoke data
  22. request, err := parser.UnmarshalJsonBytes2Map(data)
  23. if err != nil {
  24. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  25. }
  26. if request == nil {
  27. return fmt.Errorf("invoke request is empty")
  28. }
  29. // prepare invocation arguments
  30. requestHandle, err := prepareDifyInvocationArguments(
  31. session,
  32. writer,
  33. request,
  34. )
  35. if err != nil {
  36. return err
  37. }
  38. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  39. requestHandle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  40. requestHandle.EndResponse()
  41. return nil
  42. }
  43. // check permission
  44. if err := checkPermission(declaration, requestHandle); err != nil {
  45. requestHandle.WriteError(err)
  46. requestHandle.EndResponse()
  47. return nil
  48. }
  49. // dispatch invocation task
  50. routine.Submit(func() {
  51. dispatchDifyInvocationTask(requestHandle)
  52. defer requestHandle.EndResponse()
  53. })
  54. return nil
  55. }
  56. var (
  57. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  58. dify_invocation.INVOKE_TYPE_TOOL: {
  59. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  60. return declaration.Resource.Permission.AllowInvokeTool()
  61. },
  62. "error": "permission denied, you need to enable tool access in plugin manifest",
  63. },
  64. dify_invocation.INVOKE_TYPE_LLM: {
  65. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  66. return declaration.Resource.Permission.AllowInvokeLLM()
  67. },
  68. "error": "permission denied, you need to enable llm access in plugin manifest",
  69. },
  70. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  71. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  72. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  73. },
  74. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  75. },
  76. dify_invocation.INVOKE_TYPE_RERANK: {
  77. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  78. return declaration.Resource.Permission.AllowInvokeRerank()
  79. },
  80. "error": "permission denied, you need to enable rerank access in plugin manifest",
  81. },
  82. dify_invocation.INVOKE_TYPE_TTS: {
  83. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  84. return declaration.Resource.Permission.AllowInvokeTTS()
  85. },
  86. "error": "permission denied, you need to enable tts access in plugin manifest",
  87. },
  88. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  89. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  90. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  91. },
  92. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  93. },
  94. dify_invocation.INVOKE_TYPE_MODERATION: {
  95. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  96. return declaration.Resource.Permission.AllowInvokeModeration()
  97. },
  98. "error": "permission denied, you need to enable moderation access in plugin manifest",
  99. },
  100. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: {
  101. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  102. return declaration.Resource.Permission.AllowInvokeNode()
  103. },
  104. "error": "permission denied, you need to enable node access in plugin manifest",
  105. },
  106. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: {
  107. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  108. return declaration.Resource.Permission.AllowInvokeNode()
  109. },
  110. "error": "permission denied, you need to enable node access in plugin manifest",
  111. },
  112. dify_invocation.INVOKE_TYPE_APP: {
  113. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  114. return declaration.Resource.Permission.AllowInvokeApp()
  115. },
  116. "error": "permission denied, you need to enable app access in plugin manifest",
  117. },
  118. dify_invocation.INVOKE_TYPE_STORAGE: {
  119. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  120. return declaration.Resource.Permission.AllowInvokeStorage()
  121. },
  122. "error": "permission denied, you need to enable storage access in plugin manifest",
  123. },
  124. dify_invocation.INVOKE_TYPE_SUMMARY: {
  125. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  126. return declaration.Resource.Permission.AllowInvokeLLM()
  127. },
  128. "error": "permission denied, you need to enable llm access in plugin manifest",
  129. },
  130. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: {
  131. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  132. return true
  133. },
  134. "error": "permission denied, you need to enable storage access in plugin manifest",
  135. },
  136. }
  137. )
  138. func checkPermission(runtime *plugin_entities.PluginDeclaration, requestHandle *BackwardsInvocation) error {
  139. permission, ok := permissionMapping[requestHandle.Type()]
  140. if !ok {
  141. return fmt.Errorf("unsupported invoke type: %s", requestHandle.Type())
  142. }
  143. permissionFunc, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  144. if !ok {
  145. return fmt.Errorf("permission function not found: %s", requestHandle.Type())
  146. }
  147. if !permissionFunc(runtime) {
  148. return fmt.Errorf(permission["error"].(string))
  149. }
  150. return nil
  151. }
  152. func prepareDifyInvocationArguments(
  153. session *session_manager.Session,
  154. writer BackwardsInvocationWriter,
  155. request map[string]any,
  156. ) (*BackwardsInvocation, error) {
  157. typ, ok := request["type"].(string)
  158. if !ok {
  159. return nil, fmt.Errorf("invoke request missing type: %s", request)
  160. }
  161. // get request id
  162. backwardsRequestId, ok := request["backwards_request_id"].(string)
  163. if !ok {
  164. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  165. }
  166. // get request
  167. detailedRequest, ok := request["request"].(map[string]any)
  168. if !ok {
  169. return nil, fmt.Errorf("invoke request missing request: %s", request)
  170. }
  171. return NewBackwardsInvocation(
  172. BackwardsInvocationType(typ),
  173. backwardsRequestId,
  174. session,
  175. writer,
  176. detailedRequest,
  177. ), nil
  178. }
  179. var (
  180. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  181. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  182. genericDispatchTask(handle, executeDifyInvocationToolTask)
  183. },
  184. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  185. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  186. },
  187. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  188. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  189. },
  190. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  191. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  192. },
  193. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  194. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  195. },
  196. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  197. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  198. },
  199. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  200. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  201. },
  202. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  203. genericDispatchTask(handle, executeDifyInvocationAppTask)
  204. },
  205. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: func(handle *BackwardsInvocation) {
  206. genericDispatchTask(handle, executeDifyInvocationParameterExtractor)
  207. },
  208. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: func(handle *BackwardsInvocation) {
  209. genericDispatchTask(handle, executeDifyInvocationQuestionClassifier)
  210. },
  211. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  212. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  213. },
  214. dify_invocation.INVOKE_TYPE_SUMMARY: func(handle *BackwardsInvocation) {
  215. genericDispatchTask(handle, executeDifyInvocationSummaryTask)
  216. },
  217. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: func(handle *BackwardsInvocation) {
  218. genericDispatchTask(handle, executeDifyInvocationUploadFileTask)
  219. },
  220. }
  221. )
  222. func genericDispatchTask[T any](
  223. handle *BackwardsInvocation,
  224. dispatch func(
  225. handle *BackwardsInvocation,
  226. request *T,
  227. ),
  228. ) {
  229. r, err := parser.MapToStruct[T](handle.RequestData())
  230. if err != nil {
  231. handle.WriteError(fmt.Errorf("unmarshal backwards invoke request failed: %s", err.Error()))
  232. return
  233. }
  234. dispatch(handle, r)
  235. }
  236. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  237. requestData := handle.RequestData()
  238. tenantId, err := handle.TenantID()
  239. if err != nil {
  240. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  241. return
  242. }
  243. requestData["tenant_id"] = tenantId
  244. userId, err := handle.UserID()
  245. if err != nil {
  246. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  247. return
  248. }
  249. requestData["user_id"] = userId
  250. typ := handle.Type()
  251. requestData["type"] = typ
  252. for t, v := range dispatchMapping {
  253. if t == handle.Type() {
  254. v(handle)
  255. return
  256. }
  257. }
  258. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  259. }
  260. func executeDifyInvocationToolTask(
  261. handle *BackwardsInvocation,
  262. request *dify_invocation.InvokeToolRequest,
  263. ) {
  264. response, err := handle.backwardsInvocation.InvokeTool(request)
  265. if err != nil {
  266. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  267. return
  268. }
  269. for response.Next() {
  270. value, err := response.Read()
  271. if err != nil {
  272. handle.WriteError(fmt.Errorf("read tool response failed: %s", err.Error()))
  273. return
  274. }
  275. handle.WriteResponse("stream", value)
  276. }
  277. }
  278. func executeDifyInvocationLLMTask(
  279. handle *BackwardsInvocation,
  280. request *dify_invocation.InvokeLLMRequest,
  281. ) {
  282. response, err := handle.backwardsInvocation.InvokeLLM(request)
  283. if err != nil {
  284. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  285. return
  286. }
  287. for response.Next() {
  288. value, err := response.Read()
  289. if err != nil {
  290. handle.WriteError(fmt.Errorf("read llm model failed: %s", err.Error()))
  291. return
  292. }
  293. handle.WriteResponse("stream", value)
  294. }
  295. }
  296. func executeDifyInvocationTextEmbeddingTask(
  297. handle *BackwardsInvocation,
  298. request *dify_invocation.InvokeTextEmbeddingRequest,
  299. ) {
  300. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  301. if err != nil {
  302. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  303. return
  304. }
  305. handle.WriteResponse("struct", response)
  306. }
  307. func executeDifyInvocationRerankTask(
  308. handle *BackwardsInvocation,
  309. request *dify_invocation.InvokeRerankRequest,
  310. ) {
  311. response, err := handle.backwardsInvocation.InvokeRerank(request)
  312. if err != nil {
  313. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  314. return
  315. }
  316. handle.WriteResponse("struct", response)
  317. }
  318. func executeDifyInvocationTTSTask(
  319. handle *BackwardsInvocation,
  320. request *dify_invocation.InvokeTTSRequest,
  321. ) {
  322. response, err := handle.backwardsInvocation.InvokeTTS(request)
  323. if err != nil {
  324. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  325. return
  326. }
  327. for response.Next() {
  328. value, err := response.Read()
  329. if err != nil {
  330. handle.WriteError(fmt.Errorf("read tts model failed: %s", err.Error()))
  331. return
  332. }
  333. handle.WriteResponse("stream", value)
  334. }
  335. }
  336. func executeDifyInvocationSpeech2TextTask(
  337. handle *BackwardsInvocation,
  338. request *dify_invocation.InvokeSpeech2TextRequest,
  339. ) {
  340. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  341. if err != nil {
  342. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  343. return
  344. }
  345. handle.WriteResponse("struct", response)
  346. }
  347. func executeDifyInvocationModerationTask(
  348. handle *BackwardsInvocation,
  349. request *dify_invocation.InvokeModerationRequest,
  350. ) {
  351. response, err := handle.backwardsInvocation.InvokeModeration(request)
  352. if err != nil {
  353. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  354. return
  355. }
  356. handle.WriteResponse("struct", response)
  357. }
  358. func executeDifyInvocationAppTask(
  359. handle *BackwardsInvocation,
  360. request *dify_invocation.InvokeAppRequest,
  361. ) {
  362. response, err := handle.backwardsInvocation.InvokeApp(request)
  363. if err != nil {
  364. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  365. return
  366. }
  367. userId, err := handle.UserID()
  368. if err != nil {
  369. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  370. return
  371. }
  372. request.User = userId
  373. for response.Next() {
  374. value, err := response.Read()
  375. if err != nil {
  376. handle.WriteError(fmt.Errorf("read app failed: %s", err.Error()))
  377. return
  378. }
  379. handle.WriteResponse("stream", value)
  380. }
  381. }
  382. func executeDifyInvocationParameterExtractor(
  383. handle *BackwardsInvocation,
  384. request *dify_invocation.InvokeParameterExtractorRequest,
  385. ) {
  386. response, err := handle.backwardsInvocation.InvokeParameterExtractor(request)
  387. if err != nil {
  388. handle.WriteError(fmt.Errorf("invoke parameter extractor failed: %s", err.Error()))
  389. return
  390. }
  391. handle.WriteResponse("struct", response)
  392. }
  393. func executeDifyInvocationQuestionClassifier(
  394. handle *BackwardsInvocation,
  395. request *dify_invocation.InvokeQuestionClassifierRequest,
  396. ) {
  397. response, err := handle.backwardsInvocation.InvokeQuestionClassifier(request)
  398. if err != nil {
  399. handle.WriteError(fmt.Errorf("invoke question classifier failed: %s", err.Error()))
  400. return
  401. }
  402. handle.WriteResponse("struct", response)
  403. }
  404. func executeDifyInvocationStorageTask(
  405. handle *BackwardsInvocation,
  406. request *dify_invocation.InvokeStorageRequest,
  407. ) {
  408. if handle.session == nil {
  409. handle.WriteError(fmt.Errorf("session not found"))
  410. return
  411. }
  412. persistence := persistence.GetPersistence()
  413. if persistence == nil {
  414. handle.WriteError(fmt.Errorf("persistence not found"))
  415. return
  416. }
  417. tenantId, err := handle.TenantID()
  418. if err != nil {
  419. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  420. return
  421. }
  422. pluginId := handle.session.PluginUniqueIdentifier
  423. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  424. data, err := persistence.Load(tenantId, pluginId.PluginID(), request.Key)
  425. if err != nil {
  426. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  427. return
  428. }
  429. handle.WriteResponse("struct", map[string]any{
  430. "data": hex.EncodeToString(data),
  431. })
  432. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  433. data, err := hex.DecodeString(request.Value)
  434. if err != nil {
  435. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  436. return
  437. }
  438. session := handle.session
  439. if session == nil {
  440. handle.WriteError(fmt.Errorf("session not found"))
  441. return
  442. }
  443. declaration := session.Declaration
  444. if declaration == nil {
  445. handle.WriteError(fmt.Errorf("declaration not found"))
  446. return
  447. }
  448. resource := declaration.Resource.Permission
  449. if resource == nil {
  450. handle.WriteError(fmt.Errorf("resource not found"))
  451. return
  452. }
  453. maxStorageSize := int64(-1)
  454. storage := resource.Storage
  455. if storage != nil {
  456. maxStorageSize = int64(storage.Size)
  457. }
  458. if err := persistence.Save(tenantId, pluginId.PluginID(), maxStorageSize, request.Key, data); err != nil {
  459. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  460. return
  461. }
  462. handle.WriteResponse("struct", map[string]any{
  463. "data": "ok",
  464. })
  465. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  466. if err := persistence.Delete(tenantId, pluginId.PluginID(), request.Key); err != nil {
  467. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  468. return
  469. }
  470. handle.WriteResponse("struct", map[string]any{
  471. "data": "ok",
  472. })
  473. }
  474. }
  475. func executeDifyInvocationSummaryTask(
  476. handle *BackwardsInvocation,
  477. request *dify_invocation.InvokeSummaryRequest,
  478. ) {
  479. response, err := handle.backwardsInvocation.InvokeSummary(request)
  480. if err != nil {
  481. handle.WriteError(fmt.Errorf("invoke summary failed: %s", err.Error()))
  482. return
  483. }
  484. handle.WriteResponse("struct", response)
  485. }
  486. func executeDifyInvocationUploadFileTask(
  487. handle *BackwardsInvocation,
  488. request *dify_invocation.UploadFileRequest,
  489. ) {
  490. response, err := handle.backwardsInvocation.UploadFile(request)
  491. if err != nil {
  492. handle.WriteError(fmt.Errorf("upload file failed: %s", err.Error()))
  493. return
  494. }
  495. handle.WriteResponse("struct", response)
  496. }