task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  13. )
  14. func InvokeDify(
  15. declaration *plugin_entities.PluginDeclaration,
  16. invoke_from access_types.PluginAccessType,
  17. session *session_manager.Session,
  18. writer BackwardsInvocationWriter,
  19. data []byte,
  20. ) error {
  21. // unmarshal invoke data
  22. request, err := parser.UnmarshalJsonBytes2Map(data)
  23. if err != nil {
  24. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  25. }
  26. if request == nil {
  27. return fmt.Errorf("invoke request is empty")
  28. }
  29. // prepare invocation arguments
  30. request_handle, err := prepareDifyInvocationArguments(session, writer, request)
  31. if err != nil {
  32. return err
  33. }
  34. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  35. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  36. request_handle.EndResponse()
  37. return nil
  38. }
  39. // check permission
  40. if err := checkPermission(declaration, request_handle); err != nil {
  41. request_handle.WriteError(err)
  42. request_handle.EndResponse()
  43. return nil
  44. }
  45. // dispatch invocation task
  46. routine.Submit(func() {
  47. dispatchDifyInvocationTask(request_handle)
  48. defer request_handle.EndResponse()
  49. })
  50. return nil
  51. }
  52. var (
  53. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  54. dify_invocation.INVOKE_TYPE_TOOL: {
  55. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  56. return declaration.Resource.Permission.AllowInvokeTool()
  57. },
  58. "error": "permission denied, you need to enable tool access in plugin manifest",
  59. },
  60. dify_invocation.INVOKE_TYPE_LLM: {
  61. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  62. return declaration.Resource.Permission.AllowInvokeLLM()
  63. },
  64. "error": "permission denied, you need to enable llm access in plugin manifest",
  65. },
  66. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  67. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  68. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  69. },
  70. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  71. },
  72. dify_invocation.INVOKE_TYPE_RERANK: {
  73. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  74. return declaration.Resource.Permission.AllowInvokeRerank()
  75. },
  76. "error": "permission denied, you need to enable rerank access in plugin manifest",
  77. },
  78. dify_invocation.INVOKE_TYPE_TTS: {
  79. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  80. return declaration.Resource.Permission.AllowInvokeTTS()
  81. },
  82. "error": "permission denied, you need to enable tts access in plugin manifest",
  83. },
  84. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  85. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  86. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  87. },
  88. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  89. },
  90. dify_invocation.INVOKE_TYPE_MODERATION: {
  91. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  92. return declaration.Resource.Permission.AllowInvokeModeration()
  93. },
  94. "error": "permission denied, you need to enable moderation access in plugin manifest",
  95. },
  96. dify_invocation.INVOKE_TYPE_NODE: {
  97. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  98. return declaration.Resource.Permission.AllowInvokeNode()
  99. },
  100. "error": "permission denied, you need to enable node access in plugin manifest",
  101. },
  102. dify_invocation.INVOKE_TYPE_APP: {
  103. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  104. return declaration.Resource.Permission.AllowInvokeApp()
  105. },
  106. "error": "permission denied, you need to enable app access in plugin manifest",
  107. },
  108. }
  109. )
  110. func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
  111. permission, ok := permissionMapping[request_handle.Type()]
  112. if !ok {
  113. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  114. }
  115. permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  116. if !ok {
  117. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  118. }
  119. if !permission_func(runtime) {
  120. return fmt.Errorf(permission["error"].(string))
  121. }
  122. return nil
  123. }
  124. func prepareDifyInvocationArguments(
  125. session *session_manager.Session,
  126. writer BackwardsInvocationWriter,
  127. request map[string]any,
  128. ) (*BackwardsInvocation, error) {
  129. typ, ok := request["type"].(string)
  130. if !ok {
  131. return nil, fmt.Errorf("invoke request missing type: %s", request)
  132. }
  133. // get request id
  134. backwards_request_id, ok := request["backwards_request_id"].(string)
  135. if !ok {
  136. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  137. }
  138. // get request
  139. detailed_request, ok := request["request"].(map[string]any)
  140. if !ok {
  141. return nil, fmt.Errorf("invoke request missing request: %s", request)
  142. }
  143. return NewBackwardsInvocation(
  144. BackwardsInvocationType(typ),
  145. backwards_request_id,
  146. session,
  147. writer,
  148. detailed_request,
  149. ), nil
  150. }
  151. var (
  152. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  153. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  154. genericDispatchTask(handle, executeDifyInvocationToolTask)
  155. },
  156. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  157. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  158. },
  159. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  160. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  161. },
  162. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  163. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  164. },
  165. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  166. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  167. },
  168. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  169. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  170. },
  171. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  172. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  173. },
  174. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  175. genericDispatchTask(handle, executeDifyInvocationAppTask)
  176. },
  177. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  178. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  179. },
  180. }
  181. )
  182. func genericDispatchTask[T any](
  183. handle *BackwardsInvocation,
  184. dispatch func(
  185. handle *BackwardsInvocation,
  186. request *T,
  187. ),
  188. ) {
  189. r, err := parser.MapToStruct[T](handle.RequestData())
  190. if err != nil {
  191. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  192. return
  193. }
  194. dispatch(handle, r)
  195. }
  196. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  197. request_data := handle.RequestData()
  198. tenant_id, err := handle.TenantID()
  199. if err != nil {
  200. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  201. return
  202. }
  203. request_data["tenant_id"] = tenant_id
  204. user_id, err := handle.UserID()
  205. if err != nil {
  206. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  207. return
  208. }
  209. request_data["user_id"] = user_id
  210. typ := handle.Type()
  211. request_data["type"] = typ
  212. for t, v := range dispatchMapping {
  213. if t == handle.Type() {
  214. v(handle)
  215. return
  216. }
  217. }
  218. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  219. }
  220. func executeDifyInvocationToolTask(
  221. handle *BackwardsInvocation,
  222. request *dify_invocation.InvokeToolRequest,
  223. ) {
  224. response, err := dify_invocation.InvokeTool(request)
  225. if err != nil {
  226. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  227. return
  228. }
  229. response.Wrap(func(t tool_entities.ToolResponseChunk) {
  230. handle.WriteResponse("stream", t)
  231. })
  232. }
  233. func executeDifyInvocationLLMTask(
  234. handle *BackwardsInvocation,
  235. request *dify_invocation.InvokeLLMRequest,
  236. ) {
  237. response, err := dify_invocation.InvokeLLM(request)
  238. if err != nil {
  239. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  240. return
  241. }
  242. response.Wrap(func(t model_entities.LLMResultChunk) {
  243. handle.WriteResponse("stream", t)
  244. })
  245. }
  246. func executeDifyInvocationTextEmbeddingTask(
  247. handle *BackwardsInvocation,
  248. request *dify_invocation.InvokeTextEmbeddingRequest,
  249. ) {
  250. response, err := dify_invocation.InvokeTextEmbedding(request)
  251. if err != nil {
  252. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  253. return
  254. }
  255. handle.WriteResponse("struct", response)
  256. }
  257. func executeDifyInvocationRerankTask(
  258. handle *BackwardsInvocation,
  259. request *dify_invocation.InvokeRerankRequest,
  260. ) {
  261. response, err := dify_invocation.InvokeRerank(request)
  262. if err != nil {
  263. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  264. return
  265. }
  266. handle.WriteResponse("struct", response)
  267. }
  268. func executeDifyInvocationTTSTask(
  269. handle *BackwardsInvocation,
  270. request *dify_invocation.InvokeTTSRequest,
  271. ) {
  272. response, err := dify_invocation.InvokeTTS(request)
  273. if err != nil {
  274. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  275. return
  276. }
  277. response.Wrap(func(t model_entities.TTSResult) {
  278. handle.WriteResponse("struct", t)
  279. })
  280. }
  281. func executeDifyInvocationSpeech2TextTask(
  282. handle *BackwardsInvocation,
  283. request *dify_invocation.InvokeSpeech2TextRequest,
  284. ) {
  285. response, err := dify_invocation.InvokeSpeech2Text(request)
  286. if err != nil {
  287. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  288. return
  289. }
  290. handle.WriteResponse("struct", response)
  291. }
  292. func executeDifyInvocationModerationTask(
  293. handle *BackwardsInvocation,
  294. request *dify_invocation.InvokeModerationRequest,
  295. ) {
  296. response, err := dify_invocation.InvokeModeration(request)
  297. if err != nil {
  298. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  299. return
  300. }
  301. handle.WriteResponse("struct", response)
  302. }
  303. func executeDifyInvocationAppTask(
  304. handle *BackwardsInvocation,
  305. request *dify_invocation.InvokeAppRequest,
  306. ) {
  307. response, err := dify_invocation.InvokeApp(request)
  308. if err != nil {
  309. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  310. return
  311. }
  312. user_id, err := handle.UserID()
  313. if err != nil {
  314. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  315. return
  316. }
  317. request.User = user_id
  318. response.Wrap(func(t map[string]any) {
  319. handle.WriteResponse("stream", t)
  320. })
  321. }
  322. func executeDifyInvocationStorageTask(
  323. handle *BackwardsInvocation,
  324. request *dify_invocation.InvokeStorageRequest,
  325. ) {
  326. if handle.session == nil {
  327. handle.WriteError(fmt.Errorf("session not found"))
  328. return
  329. }
  330. persistence := handle.session.Storage()
  331. if persistence == nil {
  332. handle.WriteError(fmt.Errorf("persistence not found"))
  333. return
  334. }
  335. tenant_id, err := handle.TenantID()
  336. if err != nil {
  337. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  338. return
  339. }
  340. plugin_id := handle.session.PluginIdentity
  341. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  342. data, err := persistence.Load(tenant_id, plugin_id, request.Key)
  343. if err != nil {
  344. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  345. return
  346. }
  347. handle.WriteResponse("struct", map[string]any{
  348. "data": hex.EncodeToString(data),
  349. })
  350. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  351. data, err := hex.DecodeString(request.Value)
  352. if err != nil {
  353. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  354. return
  355. }
  356. if err := persistence.Save(tenant_id, plugin_id, request.Key, data); err != nil {
  357. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  358. return
  359. }
  360. handle.WriteResponse("struct", map[string]any{
  361. "data": "ok",
  362. })
  363. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  364. if err := persistence.Delete(tenant_id, plugin_id, request.Key); err != nil {
  365. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  366. return
  367. }
  368. handle.WriteResponse("struct", map[string]any{
  369. "data": "ok",
  370. })
  371. }
  372. }