task.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. // returns error only if payload is not correct
  16. func InvokeDify(
  17. declaration *plugin_entities.PluginDeclaration,
  18. invoke_from access_types.PluginAccessType,
  19. session *session_manager.Session,
  20. writer BackwardsInvocationWriter,
  21. data []byte,
  22. ) error {
  23. // unmarshal invoke data
  24. request, err := parser.UnmarshalJsonBytes2Map(data)
  25. if err != nil {
  26. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  27. }
  28. if request == nil {
  29. return fmt.Errorf("invoke request is empty")
  30. }
  31. // prepare invocation arguments
  32. request_handle, err := prepareDifyInvocationArguments(
  33. session,
  34. writer,
  35. request,
  36. )
  37. if err != nil {
  38. return err
  39. }
  40. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  41. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  42. request_handle.EndResponse()
  43. return nil
  44. }
  45. // check permission
  46. if err := checkPermission(declaration, request_handle); err != nil {
  47. request_handle.WriteError(err)
  48. request_handle.EndResponse()
  49. return nil
  50. }
  51. // dispatch invocation task
  52. routine.Submit(func() {
  53. dispatchDifyInvocationTask(request_handle)
  54. defer request_handle.EndResponse()
  55. })
  56. return nil
  57. }
  58. var (
  59. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  60. dify_invocation.INVOKE_TYPE_TOOL: {
  61. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  62. return declaration.Resource.Permission.AllowInvokeTool()
  63. },
  64. "error": "permission denied, you need to enable tool access in plugin manifest",
  65. },
  66. dify_invocation.INVOKE_TYPE_LLM: {
  67. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  68. return declaration.Resource.Permission.AllowInvokeLLM()
  69. },
  70. "error": "permission denied, you need to enable llm access in plugin manifest",
  71. },
  72. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  73. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  74. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  75. },
  76. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  77. },
  78. dify_invocation.INVOKE_TYPE_RERANK: {
  79. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  80. return declaration.Resource.Permission.AllowInvokeRerank()
  81. },
  82. "error": "permission denied, you need to enable rerank access in plugin manifest",
  83. },
  84. dify_invocation.INVOKE_TYPE_TTS: {
  85. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  86. return declaration.Resource.Permission.AllowInvokeTTS()
  87. },
  88. "error": "permission denied, you need to enable tts access in plugin manifest",
  89. },
  90. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  91. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  92. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  93. },
  94. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  95. },
  96. dify_invocation.INVOKE_TYPE_MODERATION: {
  97. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  98. return declaration.Resource.Permission.AllowInvokeModeration()
  99. },
  100. "error": "permission denied, you need to enable moderation access in plugin manifest",
  101. },
  102. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: {
  103. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  104. return declaration.Resource.Permission.AllowInvokeNode()
  105. },
  106. "error": "permission denied, you need to enable node access in plugin manifest",
  107. },
  108. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: {
  109. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  110. return declaration.Resource.Permission.AllowInvokeNode()
  111. },
  112. "error": "permission denied, you need to enable node access in plugin manifest",
  113. },
  114. dify_invocation.INVOKE_TYPE_APP: {
  115. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  116. return declaration.Resource.Permission.AllowInvokeApp()
  117. },
  118. "error": "permission denied, you need to enable app access in plugin manifest",
  119. },
  120. dify_invocation.INVOKE_TYPE_STORAGE: {
  121. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  122. return declaration.Resource.Permission.AllowInvokeStorage()
  123. },
  124. "error": "permission denied, you need to enable storage access in plugin manifest",
  125. },
  126. }
  127. )
  128. func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
  129. permission, ok := permissionMapping[request_handle.Type()]
  130. if !ok {
  131. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  132. }
  133. permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  134. if !ok {
  135. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  136. }
  137. if !permission_func(runtime) {
  138. return fmt.Errorf(permission["error"].(string))
  139. }
  140. return nil
  141. }
  142. func prepareDifyInvocationArguments(
  143. session *session_manager.Session,
  144. writer BackwardsInvocationWriter,
  145. request map[string]any,
  146. ) (*BackwardsInvocation, error) {
  147. typ, ok := request["type"].(string)
  148. if !ok {
  149. return nil, fmt.Errorf("invoke request missing type: %s", request)
  150. }
  151. // get request id
  152. backwards_request_id, ok := request["backwards_request_id"].(string)
  153. if !ok {
  154. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  155. }
  156. // get request
  157. detailed_request, ok := request["request"].(map[string]any)
  158. if !ok {
  159. return nil, fmt.Errorf("invoke request missing request: %s", request)
  160. }
  161. return NewBackwardsInvocation(
  162. BackwardsInvocationType(typ),
  163. backwards_request_id,
  164. session,
  165. writer,
  166. detailed_request,
  167. ), nil
  168. }
  169. var (
  170. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  171. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  172. genericDispatchTask(handle, executeDifyInvocationToolTask)
  173. },
  174. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  175. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  176. },
  177. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  178. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  179. },
  180. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  181. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  182. },
  183. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  184. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  185. },
  186. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  187. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  188. },
  189. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  190. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  191. },
  192. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  193. genericDispatchTask(handle, executeDifyInvocationAppTask)
  194. },
  195. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: func(handle *BackwardsInvocation) {
  196. genericDispatchTask(handle, executeDifyInvocationParameterExtractor)
  197. },
  198. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: func(handle *BackwardsInvocation) {
  199. genericDispatchTask(handle, executeDifyInvocationQuestionClassifier)
  200. },
  201. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  202. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  203. },
  204. }
  205. )
  206. func genericDispatchTask[T any](
  207. handle *BackwardsInvocation,
  208. dispatch func(
  209. handle *BackwardsInvocation,
  210. request *T,
  211. ),
  212. ) {
  213. r, err := parser.MapToStruct[T](handle.RequestData())
  214. if err != nil {
  215. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  216. return
  217. }
  218. dispatch(handle, r)
  219. }
  220. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  221. request_data := handle.RequestData()
  222. tenant_id, err := handle.TenantID()
  223. if err != nil {
  224. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  225. return
  226. }
  227. request_data["tenant_id"] = tenant_id
  228. user_id, err := handle.UserID()
  229. if err != nil {
  230. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  231. return
  232. }
  233. request_data["user_id"] = user_id
  234. typ := handle.Type()
  235. request_data["type"] = typ
  236. for t, v := range dispatchMapping {
  237. if t == handle.Type() {
  238. v(handle)
  239. return
  240. }
  241. }
  242. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  243. }
  244. func executeDifyInvocationToolTask(
  245. handle *BackwardsInvocation,
  246. request *dify_invocation.InvokeToolRequest,
  247. ) {
  248. response, err := handle.backwardsInvocation.InvokeTool(request)
  249. if err != nil {
  250. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  251. return
  252. }
  253. response.Async(func(t tool_entities.ToolResponseChunk) {
  254. handle.WriteResponse("stream", t)
  255. })
  256. }
  257. func executeDifyInvocationLLMTask(
  258. handle *BackwardsInvocation,
  259. request *dify_invocation.InvokeLLMRequest,
  260. ) {
  261. response, err := handle.backwardsInvocation.InvokeLLM(request)
  262. if err != nil {
  263. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  264. return
  265. }
  266. response.Async(func(t model_entities.LLMResultChunk) {
  267. handle.WriteResponse("stream", t)
  268. })
  269. }
  270. func executeDifyInvocationTextEmbeddingTask(
  271. handle *BackwardsInvocation,
  272. request *dify_invocation.InvokeTextEmbeddingRequest,
  273. ) {
  274. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  275. if err != nil {
  276. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  277. return
  278. }
  279. handle.WriteResponse("struct", response)
  280. }
  281. func executeDifyInvocationRerankTask(
  282. handle *BackwardsInvocation,
  283. request *dify_invocation.InvokeRerankRequest,
  284. ) {
  285. response, err := handle.backwardsInvocation.InvokeRerank(request)
  286. if err != nil {
  287. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  288. return
  289. }
  290. handle.WriteResponse("struct", response)
  291. }
  292. func executeDifyInvocationTTSTask(
  293. handle *BackwardsInvocation,
  294. request *dify_invocation.InvokeTTSRequest,
  295. ) {
  296. response, err := handle.backwardsInvocation.InvokeTTS(request)
  297. if err != nil {
  298. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  299. return
  300. }
  301. response.Async(func(t model_entities.TTSResult) {
  302. handle.WriteResponse("struct", t)
  303. })
  304. }
  305. func executeDifyInvocationSpeech2TextTask(
  306. handle *BackwardsInvocation,
  307. request *dify_invocation.InvokeSpeech2TextRequest,
  308. ) {
  309. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  310. if err != nil {
  311. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  312. return
  313. }
  314. handle.WriteResponse("struct", response)
  315. }
  316. func executeDifyInvocationModerationTask(
  317. handle *BackwardsInvocation,
  318. request *dify_invocation.InvokeModerationRequest,
  319. ) {
  320. response, err := handle.backwardsInvocation.InvokeModeration(request)
  321. if err != nil {
  322. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  323. return
  324. }
  325. handle.WriteResponse("struct", response)
  326. }
  327. func executeDifyInvocationAppTask(
  328. handle *BackwardsInvocation,
  329. request *dify_invocation.InvokeAppRequest,
  330. ) {
  331. response, err := handle.backwardsInvocation.InvokeApp(request)
  332. if err != nil {
  333. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  334. return
  335. }
  336. user_id, err := handle.UserID()
  337. if err != nil {
  338. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  339. return
  340. }
  341. request.User = user_id
  342. response.Async(func(t map[string]any) {
  343. handle.WriteResponse("stream", t)
  344. })
  345. }
  346. func executeDifyInvocationParameterExtractor(
  347. handle *BackwardsInvocation,
  348. request *dify_invocation.InvokeParameterExtractorRequest,
  349. ) {
  350. response, err := handle.backwardsInvocation.InvokeParameterExtractor(request)
  351. if err != nil {
  352. handle.WriteError(fmt.Errorf("invoke parameter extractor failed: %s", err.Error()))
  353. return
  354. }
  355. handle.WriteResponse("struct", response)
  356. }
  357. func executeDifyInvocationQuestionClassifier(
  358. handle *BackwardsInvocation,
  359. request *dify_invocation.InvokeQuestionClassifierRequest,
  360. ) {
  361. response, err := handle.backwardsInvocation.InvokeQuestionClassifier(request)
  362. if err != nil {
  363. handle.WriteError(fmt.Errorf("invoke question classifier failed: %s", err.Error()))
  364. return
  365. }
  366. handle.WriteResponse("struct", response)
  367. }
  368. func executeDifyInvocationStorageTask(
  369. handle *BackwardsInvocation,
  370. request *dify_invocation.InvokeStorageRequest,
  371. ) {
  372. if handle.session == nil {
  373. handle.WriteError(fmt.Errorf("session not found"))
  374. return
  375. }
  376. persistence := persistence.GetPersistence()
  377. if persistence == nil {
  378. handle.WriteError(fmt.Errorf("persistence not found"))
  379. return
  380. }
  381. tenant_id, err := handle.TenantID()
  382. if err != nil {
  383. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  384. return
  385. }
  386. plugin_id := handle.session.PluginUniqueIdentifier
  387. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  388. data, err := persistence.Load(tenant_id, plugin_id.PluginID(), request.Key)
  389. if err != nil {
  390. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  391. return
  392. }
  393. handle.WriteResponse("struct", map[string]any{
  394. "data": hex.EncodeToString(data),
  395. })
  396. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  397. data, err := hex.DecodeString(request.Value)
  398. if err != nil {
  399. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  400. return
  401. }
  402. if err := persistence.Save(tenant_id, plugin_id.PluginID(), request.Key, data); err != nil {
  403. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  404. return
  405. }
  406. handle.WriteResponse("struct", map[string]any{
  407. "data": "ok",
  408. })
  409. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  410. if err := persistence.Delete(tenant_id, plugin_id.PluginID(), request.Key); err != nil {
  411. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  412. return
  413. }
  414. handle.WriteResponse("struct", map[string]any{
  415. "data": "ok",
  416. })
  417. }
  418. }