task.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. // returns error only if payload is not correct
  16. func InvokeDify(
  17. declaration *plugin_entities.PluginDeclaration,
  18. invoke_from access_types.PluginAccessType,
  19. session *session_manager.Session,
  20. writer BackwardsInvocationWriter,
  21. data []byte,
  22. ) error {
  23. // unmarshal invoke data
  24. request, err := parser.UnmarshalJsonBytes2Map(data)
  25. if err != nil {
  26. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  27. }
  28. if request == nil {
  29. return fmt.Errorf("invoke request is empty")
  30. }
  31. // prepare invocation arguments
  32. request_handle, err := prepareDifyInvocationArguments(
  33. session,
  34. writer,
  35. request,
  36. )
  37. if err != nil {
  38. return err
  39. }
  40. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  41. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  42. request_handle.EndResponse()
  43. return nil
  44. }
  45. // check permission
  46. if err := checkPermission(declaration, request_handle); err != nil {
  47. request_handle.WriteError(err)
  48. request_handle.EndResponse()
  49. return nil
  50. }
  51. // dispatch invocation task
  52. routine.Submit(func() {
  53. dispatchDifyInvocationTask(request_handle)
  54. defer request_handle.EndResponse()
  55. })
  56. return nil
  57. }
  58. var (
  59. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  60. dify_invocation.INVOKE_TYPE_TOOL: {
  61. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  62. return declaration.Resource.Permission.AllowInvokeTool()
  63. },
  64. "error": "permission denied, you need to enable tool access in plugin manifest",
  65. },
  66. dify_invocation.INVOKE_TYPE_LLM: {
  67. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  68. return declaration.Resource.Permission.AllowInvokeLLM()
  69. },
  70. "error": "permission denied, you need to enable llm access in plugin manifest",
  71. },
  72. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  73. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  74. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  75. },
  76. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  77. },
  78. dify_invocation.INVOKE_TYPE_RERANK: {
  79. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  80. return declaration.Resource.Permission.AllowInvokeRerank()
  81. },
  82. "error": "permission denied, you need to enable rerank access in plugin manifest",
  83. },
  84. dify_invocation.INVOKE_TYPE_TTS: {
  85. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  86. return declaration.Resource.Permission.AllowInvokeTTS()
  87. },
  88. "error": "permission denied, you need to enable tts access in plugin manifest",
  89. },
  90. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  91. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  92. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  93. },
  94. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  95. },
  96. dify_invocation.INVOKE_TYPE_MODERATION: {
  97. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  98. return declaration.Resource.Permission.AllowInvokeModeration()
  99. },
  100. "error": "permission denied, you need to enable moderation access in plugin manifest",
  101. },
  102. dify_invocation.INVOKE_TYPE_NODE: {
  103. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  104. return declaration.Resource.Permission.AllowInvokeNode()
  105. },
  106. "error": "permission denied, you need to enable node access in plugin manifest",
  107. },
  108. dify_invocation.INVOKE_TYPE_APP: {
  109. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  110. return declaration.Resource.Permission.AllowInvokeApp()
  111. },
  112. "error": "permission denied, you need to enable app access in plugin manifest",
  113. },
  114. dify_invocation.INVOKE_TYPE_STORAGE: {
  115. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  116. return declaration.Resource.Permission.AllowInvokeStorage()
  117. },
  118. "error": "permission denied, you need to enable storage access in plugin manifest",
  119. },
  120. }
  121. )
  122. func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
  123. permission, ok := permissionMapping[request_handle.Type()]
  124. if !ok {
  125. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  126. }
  127. permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  128. if !ok {
  129. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  130. }
  131. if !permission_func(runtime) {
  132. return fmt.Errorf(permission["error"].(string))
  133. }
  134. return nil
  135. }
  136. func prepareDifyInvocationArguments(
  137. session *session_manager.Session,
  138. writer BackwardsInvocationWriter,
  139. request map[string]any,
  140. ) (*BackwardsInvocation, error) {
  141. typ, ok := request["type"].(string)
  142. if !ok {
  143. return nil, fmt.Errorf("invoke request missing type: %s", request)
  144. }
  145. // get request id
  146. backwards_request_id, ok := request["backwards_request_id"].(string)
  147. if !ok {
  148. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  149. }
  150. // get request
  151. detailed_request, ok := request["request"].(map[string]any)
  152. if !ok {
  153. return nil, fmt.Errorf("invoke request missing request: %s", request)
  154. }
  155. return NewBackwardsInvocation(
  156. BackwardsInvocationType(typ),
  157. backwards_request_id,
  158. session,
  159. writer,
  160. detailed_request,
  161. ), nil
  162. }
  163. var (
  164. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  165. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  166. genericDispatchTask(handle, executeDifyInvocationToolTask)
  167. },
  168. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  169. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  170. },
  171. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  172. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  173. },
  174. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  175. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  176. },
  177. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  178. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  179. },
  180. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  181. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  182. },
  183. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  184. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  185. },
  186. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  187. genericDispatchTask(handle, executeDifyInvocationAppTask)
  188. },
  189. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  190. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  191. },
  192. }
  193. )
  194. func genericDispatchTask[T any](
  195. handle *BackwardsInvocation,
  196. dispatch func(
  197. handle *BackwardsInvocation,
  198. request *T,
  199. ),
  200. ) {
  201. r, err := parser.MapToStruct[T](handle.RequestData())
  202. if err != nil {
  203. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  204. return
  205. }
  206. dispatch(handle, r)
  207. }
  208. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  209. request_data := handle.RequestData()
  210. tenant_id, err := handle.TenantID()
  211. if err != nil {
  212. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  213. return
  214. }
  215. request_data["tenant_id"] = tenant_id
  216. user_id, err := handle.UserID()
  217. if err != nil {
  218. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  219. return
  220. }
  221. request_data["user_id"] = user_id
  222. typ := handle.Type()
  223. request_data["type"] = typ
  224. for t, v := range dispatchMapping {
  225. if t == handle.Type() {
  226. v(handle)
  227. return
  228. }
  229. }
  230. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  231. }
  232. func executeDifyInvocationToolTask(
  233. handle *BackwardsInvocation,
  234. request *dify_invocation.InvokeToolRequest,
  235. ) {
  236. response, err := handle.backwardsInvocation.InvokeTool(request)
  237. if err != nil {
  238. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  239. return
  240. }
  241. response.Async(func(t tool_entities.ToolResponseChunk) {
  242. handle.WriteResponse("stream", t)
  243. })
  244. }
  245. func executeDifyInvocationLLMTask(
  246. handle *BackwardsInvocation,
  247. request *dify_invocation.InvokeLLMRequest,
  248. ) {
  249. response, err := handle.backwardsInvocation.InvokeLLM(request)
  250. if err != nil {
  251. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  252. return
  253. }
  254. response.Async(func(t model_entities.LLMResultChunk) {
  255. handle.WriteResponse("stream", t)
  256. })
  257. }
  258. func executeDifyInvocationTextEmbeddingTask(
  259. handle *BackwardsInvocation,
  260. request *dify_invocation.InvokeTextEmbeddingRequest,
  261. ) {
  262. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  263. if err != nil {
  264. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  265. return
  266. }
  267. handle.WriteResponse("struct", response)
  268. }
  269. func executeDifyInvocationRerankTask(
  270. handle *BackwardsInvocation,
  271. request *dify_invocation.InvokeRerankRequest,
  272. ) {
  273. response, err := handle.backwardsInvocation.InvokeRerank(request)
  274. if err != nil {
  275. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  276. return
  277. }
  278. handle.WriteResponse("struct", response)
  279. }
  280. func executeDifyInvocationTTSTask(
  281. handle *BackwardsInvocation,
  282. request *dify_invocation.InvokeTTSRequest,
  283. ) {
  284. response, err := handle.backwardsInvocation.InvokeTTS(request)
  285. if err != nil {
  286. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  287. return
  288. }
  289. response.Async(func(t model_entities.TTSResult) {
  290. handle.WriteResponse("struct", t)
  291. })
  292. }
  293. func executeDifyInvocationSpeech2TextTask(
  294. handle *BackwardsInvocation,
  295. request *dify_invocation.InvokeSpeech2TextRequest,
  296. ) {
  297. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  298. if err != nil {
  299. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  300. return
  301. }
  302. handle.WriteResponse("struct", response)
  303. }
  304. func executeDifyInvocationModerationTask(
  305. handle *BackwardsInvocation,
  306. request *dify_invocation.InvokeModerationRequest,
  307. ) {
  308. response, err := handle.backwardsInvocation.InvokeModeration(request)
  309. if err != nil {
  310. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  311. return
  312. }
  313. handle.WriteResponse("struct", response)
  314. }
  315. func executeDifyInvocationAppTask(
  316. handle *BackwardsInvocation,
  317. request *dify_invocation.InvokeAppRequest,
  318. ) {
  319. response, err := handle.backwardsInvocation.InvokeApp(request)
  320. if err != nil {
  321. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  322. return
  323. }
  324. user_id, err := handle.UserID()
  325. if err != nil {
  326. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  327. return
  328. }
  329. request.User = user_id
  330. response.Async(func(t map[string]any) {
  331. handle.WriteResponse("stream", t)
  332. })
  333. }
  334. func executeDifyInvocationStorageTask(
  335. handle *BackwardsInvocation,
  336. request *dify_invocation.InvokeStorageRequest,
  337. ) {
  338. if handle.session == nil {
  339. handle.WriteError(fmt.Errorf("session not found"))
  340. return
  341. }
  342. persistence := persistence.GetPersistence()
  343. if persistence == nil {
  344. handle.WriteError(fmt.Errorf("persistence not found"))
  345. return
  346. }
  347. tenant_id, err := handle.TenantID()
  348. if err != nil {
  349. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  350. return
  351. }
  352. plugin_id := handle.session.PluginUniqueIdentifier
  353. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  354. data, err := persistence.Load(tenant_id, plugin_id.PluginID(), request.Key)
  355. if err != nil {
  356. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  357. return
  358. }
  359. handle.WriteResponse("struct", map[string]any{
  360. "data": hex.EncodeToString(data),
  361. })
  362. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  363. data, err := hex.DecodeString(request.Value)
  364. if err != nil {
  365. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  366. return
  367. }
  368. if err := persistence.Save(tenant_id, plugin_id.PluginID(), request.Key, data); err != nil {
  369. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  370. return
  371. }
  372. handle.WriteResponse("struct", map[string]any{
  373. "data": "ok",
  374. })
  375. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  376. if err := persistence.Delete(tenant_id, plugin_id.PluginID(), request.Key); err != nil {
  377. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  378. return
  379. }
  380. handle.WriteResponse("struct", map[string]any{
  381. "data": "ok",
  382. })
  383. }
  384. }