task.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. // returns error only if payload is not correct
  16. func InvokeDify(
  17. declaration *plugin_entities.PluginDeclaration,
  18. invoke_from access_types.PluginAccessType,
  19. session *session_manager.Session,
  20. writer BackwardsInvocationWriter,
  21. data []byte,
  22. ) error {
  23. // unmarshal invoke data
  24. request, err := parser.UnmarshalJsonBytes2Map(data)
  25. if err != nil {
  26. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  27. }
  28. if request == nil {
  29. return fmt.Errorf("invoke request is empty")
  30. }
  31. // prepare invocation arguments
  32. requestHandle, err := prepareDifyInvocationArguments(
  33. session,
  34. writer,
  35. request,
  36. )
  37. if err != nil {
  38. return err
  39. }
  40. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  41. requestHandle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  42. requestHandle.EndResponse()
  43. return nil
  44. }
  45. // check permission
  46. if err := checkPermission(declaration, requestHandle); err != nil {
  47. requestHandle.WriteError(err)
  48. requestHandle.EndResponse()
  49. return nil
  50. }
  51. // dispatch invocation task
  52. routine.Submit(map[string]string{
  53. "module": "plugin_daemon",
  54. "function": "InvokeDify",
  55. }, func() {
  56. dispatchDifyInvocationTask(requestHandle)
  57. defer requestHandle.EndResponse()
  58. })
  59. return nil
  60. }
  61. var (
  62. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  63. dify_invocation.INVOKE_TYPE_TOOL: {
  64. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  65. return declaration.Resource.Permission.AllowInvokeTool()
  66. },
  67. "error": "permission denied, you need to enable tool access in plugin manifest",
  68. },
  69. dify_invocation.INVOKE_TYPE_LLM: {
  70. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  71. return declaration.Resource.Permission.AllowInvokeLLM()
  72. },
  73. "error": "permission denied, you need to enable llm access in plugin manifest",
  74. },
  75. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  76. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  77. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  78. },
  79. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  80. },
  81. dify_invocation.INVOKE_TYPE_RERANK: {
  82. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  83. return declaration.Resource.Permission.AllowInvokeRerank()
  84. },
  85. "error": "permission denied, you need to enable rerank access in plugin manifest",
  86. },
  87. dify_invocation.INVOKE_TYPE_TTS: {
  88. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  89. return declaration.Resource.Permission.AllowInvokeTTS()
  90. },
  91. "error": "permission denied, you need to enable tts access in plugin manifest",
  92. },
  93. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  94. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  95. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  96. },
  97. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  98. },
  99. dify_invocation.INVOKE_TYPE_MODERATION: {
  100. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  101. return declaration.Resource.Permission.AllowInvokeModeration()
  102. },
  103. "error": "permission denied, you need to enable moderation access in plugin manifest",
  104. },
  105. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: {
  106. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  107. return declaration.Resource.Permission.AllowInvokeNode()
  108. },
  109. "error": "permission denied, you need to enable node access in plugin manifest",
  110. },
  111. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: {
  112. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  113. return declaration.Resource.Permission.AllowInvokeNode()
  114. },
  115. "error": "permission denied, you need to enable node access in plugin manifest",
  116. },
  117. dify_invocation.INVOKE_TYPE_APP: {
  118. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  119. return declaration.Resource.Permission.AllowInvokeApp()
  120. },
  121. "error": "permission denied, you need to enable app access in plugin manifest",
  122. },
  123. dify_invocation.INVOKE_TYPE_STORAGE: {
  124. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  125. return declaration.Resource.Permission.AllowInvokeStorage()
  126. },
  127. "error": "permission denied, you need to enable storage access in plugin manifest",
  128. },
  129. dify_invocation.INVOKE_TYPE_SYSTEM_SUMMARY: {
  130. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  131. return declaration.Resource.Permission.AllowInvokeLLM()
  132. },
  133. "error": "permission denied, you need to enable llm access in plugin manifest",
  134. },
  135. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: {
  136. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  137. return true
  138. },
  139. "error": "permission denied, you need to enable storage access in plugin manifest",
  140. },
  141. }
  142. )
  143. func checkPermission(runtime *plugin_entities.PluginDeclaration, requestHandle *BackwardsInvocation) error {
  144. permission, ok := permissionMapping[requestHandle.Type()]
  145. if !ok {
  146. return fmt.Errorf("unsupported invoke type: %s", requestHandle.Type())
  147. }
  148. permissionFunc, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  149. if !ok {
  150. return fmt.Errorf("permission function not found: %s", requestHandle.Type())
  151. }
  152. if !permissionFunc(runtime) {
  153. return fmt.Errorf(permission["error"].(string))
  154. }
  155. return nil
  156. }
  157. func prepareDifyInvocationArguments(
  158. session *session_manager.Session,
  159. writer BackwardsInvocationWriter,
  160. request map[string]any,
  161. ) (*BackwardsInvocation, error) {
  162. typ, ok := request["type"].(string)
  163. if !ok {
  164. return nil, fmt.Errorf("invoke request missing type: %s", request)
  165. }
  166. // get request id
  167. backwardsRequestId, ok := request["backwards_request_id"].(string)
  168. if !ok {
  169. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  170. }
  171. // get request
  172. detailedRequest, ok := request["request"].(map[string]any)
  173. if !ok {
  174. return nil, fmt.Errorf("invoke request missing request: %s", request)
  175. }
  176. return NewBackwardsInvocation(
  177. BackwardsInvocationType(typ),
  178. backwardsRequestId,
  179. session,
  180. writer,
  181. detailedRequest,
  182. ), nil
  183. }
  184. var (
  185. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  186. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  187. genericDispatchTask(handle, executeDifyInvocationToolTask)
  188. },
  189. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  190. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  191. },
  192. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  193. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  194. },
  195. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  196. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  197. },
  198. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  199. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  200. },
  201. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  202. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  203. },
  204. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  205. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  206. },
  207. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  208. genericDispatchTask(handle, executeDifyInvocationAppTask)
  209. },
  210. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: func(handle *BackwardsInvocation) {
  211. genericDispatchTask(handle, executeDifyInvocationParameterExtractor)
  212. },
  213. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: func(handle *BackwardsInvocation) {
  214. genericDispatchTask(handle, executeDifyInvocationQuestionClassifier)
  215. },
  216. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  217. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  218. },
  219. dify_invocation.INVOKE_TYPE_SYSTEM_SUMMARY: func(handle *BackwardsInvocation) {
  220. genericDispatchTask(handle, executeDifyInvocationSystemSummaryTask)
  221. },
  222. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: func(handle *BackwardsInvocation) {
  223. genericDispatchTask(handle, executeDifyInvocationUploadFileTask)
  224. },
  225. }
  226. )
  227. func genericDispatchTask[T any](
  228. handle *BackwardsInvocation,
  229. dispatch func(
  230. handle *BackwardsInvocation,
  231. request *T,
  232. ),
  233. ) {
  234. r, err := parser.MapToStruct[T](handle.RequestData())
  235. if err != nil {
  236. handle.WriteError(fmt.Errorf("unmarshal backwards invoke request failed: %s", err.Error()))
  237. return
  238. }
  239. dispatch(handle, r)
  240. }
  241. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  242. requestData := handle.RequestData()
  243. tenantId, err := handle.TenantID()
  244. if err != nil {
  245. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  246. return
  247. }
  248. requestData["tenant_id"] = tenantId
  249. userId, err := handle.UserID()
  250. if err != nil {
  251. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  252. return
  253. }
  254. requestData["user_id"] = userId
  255. typ := handle.Type()
  256. requestData["type"] = typ
  257. for t, v := range dispatchMapping {
  258. if t == handle.Type() {
  259. v(handle)
  260. return
  261. }
  262. }
  263. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  264. }
  265. func executeDifyInvocationToolTask(
  266. handle *BackwardsInvocation,
  267. request *dify_invocation.InvokeToolRequest,
  268. ) {
  269. response, err := handle.backwardsInvocation.InvokeTool(request)
  270. if err != nil {
  271. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  272. return
  273. }
  274. for response.Next() {
  275. value, err := response.Read()
  276. if err != nil {
  277. handle.WriteError(fmt.Errorf("read tool response failed: %s", err.Error()))
  278. return
  279. }
  280. handle.WriteResponse("stream", value)
  281. }
  282. }
  283. func executeDifyInvocationLLMTask(
  284. handle *BackwardsInvocation,
  285. request *dify_invocation.InvokeLLMRequest,
  286. ) {
  287. response, err := handle.backwardsInvocation.InvokeLLM(request)
  288. if err != nil {
  289. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  290. return
  291. }
  292. for response.Next() {
  293. value, err := response.Read()
  294. if err != nil {
  295. handle.WriteError(fmt.Errorf("read llm model failed: %s", err.Error()))
  296. return
  297. }
  298. handle.WriteResponse("stream", value)
  299. }
  300. }
  301. func executeDifyInvocationTextEmbeddingTask(
  302. handle *BackwardsInvocation,
  303. request *dify_invocation.InvokeTextEmbeddingRequest,
  304. ) {
  305. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  306. if err != nil {
  307. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  308. return
  309. }
  310. handle.WriteResponse("struct", response)
  311. }
  312. func executeDifyInvocationRerankTask(
  313. handle *BackwardsInvocation,
  314. request *dify_invocation.InvokeRerankRequest,
  315. ) {
  316. response, err := handle.backwardsInvocation.InvokeRerank(request)
  317. if err != nil {
  318. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  319. return
  320. }
  321. handle.WriteResponse("struct", response)
  322. }
  323. func executeDifyInvocationTTSTask(
  324. handle *BackwardsInvocation,
  325. request *dify_invocation.InvokeTTSRequest,
  326. ) {
  327. response, err := handle.backwardsInvocation.InvokeTTS(request)
  328. if err != nil {
  329. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  330. return
  331. }
  332. for response.Next() {
  333. value, err := response.Read()
  334. if err != nil {
  335. handle.WriteError(fmt.Errorf("read tts model failed: %s", err.Error()))
  336. return
  337. }
  338. handle.WriteResponse("stream", value)
  339. }
  340. }
  341. func executeDifyInvocationSpeech2TextTask(
  342. handle *BackwardsInvocation,
  343. request *dify_invocation.InvokeSpeech2TextRequest,
  344. ) {
  345. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  346. if err != nil {
  347. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  348. return
  349. }
  350. handle.WriteResponse("struct", response)
  351. }
  352. func executeDifyInvocationModerationTask(
  353. handle *BackwardsInvocation,
  354. request *dify_invocation.InvokeModerationRequest,
  355. ) {
  356. response, err := handle.backwardsInvocation.InvokeModeration(request)
  357. if err != nil {
  358. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  359. return
  360. }
  361. handle.WriteResponse("struct", response)
  362. }
  363. func executeDifyInvocationAppTask(
  364. handle *BackwardsInvocation,
  365. request *dify_invocation.InvokeAppRequest,
  366. ) {
  367. response, err := handle.backwardsInvocation.InvokeApp(request)
  368. if err != nil {
  369. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  370. return
  371. }
  372. userId, err := handle.UserID()
  373. if err != nil {
  374. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  375. return
  376. }
  377. request.User = userId
  378. for response.Next() {
  379. value, err := response.Read()
  380. if err != nil {
  381. handle.WriteError(fmt.Errorf("read app failed: %s", err.Error()))
  382. return
  383. }
  384. handle.WriteResponse("stream", value)
  385. }
  386. }
  387. func executeDifyInvocationParameterExtractor(
  388. handle *BackwardsInvocation,
  389. request *dify_invocation.InvokeParameterExtractorRequest,
  390. ) {
  391. response, err := handle.backwardsInvocation.InvokeParameterExtractor(request)
  392. if err != nil {
  393. handle.WriteError(fmt.Errorf("invoke parameter extractor failed: %s", err.Error()))
  394. return
  395. }
  396. handle.WriteResponse("struct", response)
  397. }
  398. func executeDifyInvocationQuestionClassifier(
  399. handle *BackwardsInvocation,
  400. request *dify_invocation.InvokeQuestionClassifierRequest,
  401. ) {
  402. response, err := handle.backwardsInvocation.InvokeQuestionClassifier(request)
  403. if err != nil {
  404. handle.WriteError(fmt.Errorf("invoke question classifier failed: %s", err.Error()))
  405. return
  406. }
  407. handle.WriteResponse("struct", response)
  408. }
  409. func executeDifyInvocationStorageTask(
  410. handle *BackwardsInvocation,
  411. request *dify_invocation.InvokeStorageRequest,
  412. ) {
  413. if handle.session == nil {
  414. handle.WriteError(fmt.Errorf("session not found"))
  415. return
  416. }
  417. persistence := persistence.GetPersistence()
  418. if persistence == nil {
  419. handle.WriteError(fmt.Errorf("persistence not found"))
  420. return
  421. }
  422. tenantId, err := handle.TenantID()
  423. if err != nil {
  424. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  425. return
  426. }
  427. pluginId := handle.session.PluginUniqueIdentifier
  428. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  429. data, err := persistence.Load(tenantId, pluginId.PluginID(), request.Key)
  430. if err != nil {
  431. log.Error("load data failed: %s", err.Error())
  432. handle.WriteError(errors.New("load data failed, please check if the key is correct or you have not set it"))
  433. return
  434. }
  435. handle.WriteResponse("struct", map[string]any{
  436. "data": hex.EncodeToString(data),
  437. })
  438. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  439. data, err := hex.DecodeString(request.Value)
  440. if err != nil {
  441. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  442. return
  443. }
  444. session := handle.session
  445. if session == nil {
  446. handle.WriteError(fmt.Errorf("session not found"))
  447. return
  448. }
  449. declaration := session.Declaration
  450. if declaration == nil {
  451. handle.WriteError(fmt.Errorf("declaration not found"))
  452. return
  453. }
  454. resource := declaration.Resource.Permission
  455. if resource == nil {
  456. handle.WriteError(fmt.Errorf("resource not found"))
  457. return
  458. }
  459. maxStorageSize := int64(-1)
  460. storage := resource.Storage
  461. if storage != nil {
  462. maxStorageSize = int64(storage.Size)
  463. }
  464. if err := persistence.Save(tenantId, pluginId.PluginID(), maxStorageSize, request.Key, data); err != nil {
  465. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  466. return
  467. }
  468. handle.WriteResponse("struct", map[string]any{
  469. "data": "ok",
  470. })
  471. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  472. if err := persistence.Delete(tenantId, pluginId.PluginID(), request.Key); err != nil {
  473. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  474. return
  475. }
  476. handle.WriteResponse("struct", map[string]any{
  477. "data": "ok",
  478. })
  479. }
  480. }
  481. func executeDifyInvocationSystemSummaryTask(
  482. handle *BackwardsInvocation,
  483. request *dify_invocation.InvokeSummaryRequest,
  484. ) {
  485. response, err := handle.backwardsInvocation.InvokeSummary(request)
  486. if err != nil {
  487. handle.WriteError(fmt.Errorf("invoke summary failed: %s", err.Error()))
  488. return
  489. }
  490. handle.WriteResponse("struct", response)
  491. }
  492. func executeDifyInvocationUploadFileTask(
  493. handle *BackwardsInvocation,
  494. request *dify_invocation.UploadFileRequest,
  495. ) {
  496. response, err := handle.backwardsInvocation.UploadFile(request)
  497. if err != nil {
  498. handle.WriteError(fmt.Errorf("upload file failed: %s", err.Error()))
  499. return
  500. }
  501. handle.WriteResponse("struct", response)
  502. }