task.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. func InvokeDify(
  16. declaration *plugin_entities.PluginDeclaration,
  17. invoke_from access_types.PluginAccessType,
  18. session *session_manager.Session,
  19. writer BackwardsInvocationWriter,
  20. data []byte,
  21. ) error {
  22. // unmarshal invoke data
  23. request, err := parser.UnmarshalJsonBytes2Map(data)
  24. if err != nil {
  25. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  26. }
  27. if request == nil {
  28. return fmt.Errorf("invoke request is empty")
  29. }
  30. // prepare invocation arguments
  31. request_handle, err := prepareDifyInvocationArguments(session, writer, request)
  32. if err != nil {
  33. return err
  34. }
  35. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  36. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  37. request_handle.EndResponse()
  38. return nil
  39. }
  40. // check permission
  41. if err := checkPermission(declaration, request_handle); err != nil {
  42. request_handle.WriteError(err)
  43. request_handle.EndResponse()
  44. return nil
  45. }
  46. // dispatch invocation task
  47. routine.Submit(func() {
  48. dispatchDifyInvocationTask(request_handle)
  49. defer request_handle.EndResponse()
  50. })
  51. return nil
  52. }
  53. var (
  54. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  55. dify_invocation.INVOKE_TYPE_TOOL: {
  56. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  57. return declaration.Resource.Permission.AllowInvokeTool()
  58. },
  59. "error": "permission denied, you need to enable tool access in plugin manifest",
  60. },
  61. dify_invocation.INVOKE_TYPE_LLM: {
  62. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  63. return declaration.Resource.Permission.AllowInvokeLLM()
  64. },
  65. "error": "permission denied, you need to enable llm access in plugin manifest",
  66. },
  67. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  68. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  69. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  70. },
  71. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  72. },
  73. dify_invocation.INVOKE_TYPE_RERANK: {
  74. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  75. return declaration.Resource.Permission.AllowInvokeRerank()
  76. },
  77. "error": "permission denied, you need to enable rerank access in plugin manifest",
  78. },
  79. dify_invocation.INVOKE_TYPE_TTS: {
  80. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  81. return declaration.Resource.Permission.AllowInvokeTTS()
  82. },
  83. "error": "permission denied, you need to enable tts access in plugin manifest",
  84. },
  85. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  86. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  87. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  88. },
  89. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  90. },
  91. dify_invocation.INVOKE_TYPE_MODERATION: {
  92. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  93. return declaration.Resource.Permission.AllowInvokeModeration()
  94. },
  95. "error": "permission denied, you need to enable moderation access in plugin manifest",
  96. },
  97. dify_invocation.INVOKE_TYPE_NODE: {
  98. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  99. return declaration.Resource.Permission.AllowInvokeNode()
  100. },
  101. "error": "permission denied, you need to enable node access in plugin manifest",
  102. },
  103. dify_invocation.INVOKE_TYPE_APP: {
  104. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  105. return declaration.Resource.Permission.AllowInvokeApp()
  106. },
  107. "error": "permission denied, you need to enable app access in plugin manifest",
  108. },
  109. dify_invocation.INVOKE_TYPE_STORAGE: {
  110. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  111. return declaration.Resource.Permission.AllowInvokeStorage()
  112. },
  113. "error": "permission denied, you need to enable storage access in plugin manifest",
  114. },
  115. }
  116. )
  117. func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
  118. permission, ok := permissionMapping[request_handle.Type()]
  119. if !ok {
  120. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  121. }
  122. permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  123. if !ok {
  124. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  125. }
  126. if !permission_func(runtime) {
  127. return fmt.Errorf(permission["error"].(string))
  128. }
  129. return nil
  130. }
  131. func prepareDifyInvocationArguments(
  132. session *session_manager.Session,
  133. writer BackwardsInvocationWriter,
  134. request map[string]any,
  135. ) (*BackwardsInvocation, error) {
  136. typ, ok := request["type"].(string)
  137. if !ok {
  138. return nil, fmt.Errorf("invoke request missing type: %s", request)
  139. }
  140. // get request id
  141. backwards_request_id, ok := request["backwards_request_id"].(string)
  142. if !ok {
  143. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  144. }
  145. // get request
  146. detailed_request, ok := request["request"].(map[string]any)
  147. if !ok {
  148. return nil, fmt.Errorf("invoke request missing request: %s", request)
  149. }
  150. return NewBackwardsInvocation(
  151. BackwardsInvocationType(typ),
  152. backwards_request_id,
  153. session,
  154. writer,
  155. detailed_request,
  156. ), nil
  157. }
  158. var (
  159. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  160. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  161. genericDispatchTask(handle, executeDifyInvocationToolTask)
  162. },
  163. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  164. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  165. },
  166. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  167. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  168. },
  169. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  170. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  171. },
  172. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  173. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  174. },
  175. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  176. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  177. },
  178. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  179. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  180. },
  181. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  182. genericDispatchTask(handle, executeDifyInvocationAppTask)
  183. },
  184. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  185. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  186. },
  187. }
  188. )
  189. func genericDispatchTask[T any](
  190. handle *BackwardsInvocation,
  191. dispatch func(
  192. handle *BackwardsInvocation,
  193. request *T,
  194. ),
  195. ) {
  196. r, err := parser.MapToStruct[T](handle.RequestData())
  197. if err != nil {
  198. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  199. return
  200. }
  201. dispatch(handle, r)
  202. }
  203. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  204. request_data := handle.RequestData()
  205. tenant_id, err := handle.TenantID()
  206. if err != nil {
  207. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  208. return
  209. }
  210. request_data["tenant_id"] = tenant_id
  211. user_id, err := handle.UserID()
  212. if err != nil {
  213. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  214. return
  215. }
  216. request_data["user_id"] = user_id
  217. typ := handle.Type()
  218. request_data["type"] = typ
  219. for t, v := range dispatchMapping {
  220. if t == handle.Type() {
  221. v(handle)
  222. return
  223. }
  224. }
  225. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  226. }
  227. func executeDifyInvocationToolTask(
  228. handle *BackwardsInvocation,
  229. request *dify_invocation.InvokeToolRequest,
  230. ) {
  231. response, err := dify_invocation.InvokeTool(request)
  232. if err != nil {
  233. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  234. return
  235. }
  236. response.Wrap(func(t tool_entities.ToolResponseChunk) {
  237. handle.WriteResponse("stream", t)
  238. })
  239. }
  240. func executeDifyInvocationLLMTask(
  241. handle *BackwardsInvocation,
  242. request *dify_invocation.InvokeLLMRequest,
  243. ) {
  244. response, err := dify_invocation.InvokeLLM(request)
  245. if err != nil {
  246. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  247. return
  248. }
  249. response.Wrap(func(t model_entities.LLMResultChunk) {
  250. handle.WriteResponse("stream", t)
  251. })
  252. }
  253. func executeDifyInvocationTextEmbeddingTask(
  254. handle *BackwardsInvocation,
  255. request *dify_invocation.InvokeTextEmbeddingRequest,
  256. ) {
  257. response, err := dify_invocation.InvokeTextEmbedding(request)
  258. if err != nil {
  259. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  260. return
  261. }
  262. handle.WriteResponse("struct", response)
  263. }
  264. func executeDifyInvocationRerankTask(
  265. handle *BackwardsInvocation,
  266. request *dify_invocation.InvokeRerankRequest,
  267. ) {
  268. response, err := dify_invocation.InvokeRerank(request)
  269. if err != nil {
  270. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  271. return
  272. }
  273. handle.WriteResponse("struct", response)
  274. }
  275. func executeDifyInvocationTTSTask(
  276. handle *BackwardsInvocation,
  277. request *dify_invocation.InvokeTTSRequest,
  278. ) {
  279. response, err := dify_invocation.InvokeTTS(request)
  280. if err != nil {
  281. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  282. return
  283. }
  284. response.Wrap(func(t model_entities.TTSResult) {
  285. handle.WriteResponse("struct", t)
  286. })
  287. }
  288. func executeDifyInvocationSpeech2TextTask(
  289. handle *BackwardsInvocation,
  290. request *dify_invocation.InvokeSpeech2TextRequest,
  291. ) {
  292. response, err := dify_invocation.InvokeSpeech2Text(request)
  293. if err != nil {
  294. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  295. return
  296. }
  297. handle.WriteResponse("struct", response)
  298. }
  299. func executeDifyInvocationModerationTask(
  300. handle *BackwardsInvocation,
  301. request *dify_invocation.InvokeModerationRequest,
  302. ) {
  303. response, err := dify_invocation.InvokeModeration(request)
  304. if err != nil {
  305. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  306. return
  307. }
  308. handle.WriteResponse("struct", response)
  309. }
  310. func executeDifyInvocationAppTask(
  311. handle *BackwardsInvocation,
  312. request *dify_invocation.InvokeAppRequest,
  313. ) {
  314. response, err := dify_invocation.InvokeApp(request)
  315. if err != nil {
  316. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  317. return
  318. }
  319. user_id, err := handle.UserID()
  320. if err != nil {
  321. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  322. return
  323. }
  324. request.User = user_id
  325. response.Wrap(func(t map[string]any) {
  326. handle.WriteResponse("stream", t)
  327. })
  328. }
  329. func executeDifyInvocationStorageTask(
  330. handle *BackwardsInvocation,
  331. request *dify_invocation.InvokeStorageRequest,
  332. ) {
  333. if handle.session == nil {
  334. handle.WriteError(fmt.Errorf("session not found"))
  335. return
  336. }
  337. persistence := persistence.GetPersistence()
  338. if persistence == nil {
  339. handle.WriteError(fmt.Errorf("persistence not found"))
  340. return
  341. }
  342. tenant_id, err := handle.TenantID()
  343. if err != nil {
  344. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  345. return
  346. }
  347. plugin_id := handle.session.PluginUniqueIdentifier
  348. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  349. data, err := persistence.Load(tenant_id, plugin_id.PluginID(), request.Key)
  350. if err != nil {
  351. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  352. return
  353. }
  354. handle.WriteResponse("struct", map[string]any{
  355. "data": hex.EncodeToString(data),
  356. })
  357. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  358. data, err := hex.DecodeString(request.Value)
  359. if err != nil {
  360. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  361. return
  362. }
  363. if err := persistence.Save(tenant_id, plugin_id.PluginID(), request.Key, data); err != nil {
  364. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  365. return
  366. }
  367. handle.WriteResponse("struct", map[string]any{
  368. "data": "ok",
  369. })
  370. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  371. if err := persistence.Delete(tenant_id, plugin_id.PluginID(), request.Key); err != nil {
  372. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  373. return
  374. }
  375. handle.WriteResponse("struct", map[string]any{
  376. "data": "ok",
  377. })
  378. }
  379. }