task.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package backwards_invocation
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. func InvokeDify(
  14. runtime entities.PluginRuntimeInterface,
  15. invoke_from access_types.PluginAccessType,
  16. session *session_manager.Session, data []byte,
  17. ) error {
  18. // unmarshal invoke data
  19. request, err := parser.UnmarshalJsonBytes2Map(data)
  20. if err != nil {
  21. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  22. }
  23. if request == nil {
  24. return fmt.Errorf("invoke request is empty")
  25. }
  26. // prepare invocation arguments
  27. request_handle, err := prepareDifyInvocationArguments(session, request)
  28. if err != nil {
  29. return err
  30. }
  31. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  32. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  33. request_handle.EndResponse()
  34. return nil
  35. }
  36. // check permission
  37. if err := checkPermission(runtime, request_handle); err != nil {
  38. request_handle.WriteError(err)
  39. request_handle.EndResponse()
  40. return nil
  41. }
  42. // dispatch invocation task
  43. routine.Submit(func() {
  44. dispatchDifyInvocationTask(request_handle)
  45. defer request_handle.EndResponse()
  46. })
  47. return nil
  48. }
  49. var (
  50. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  51. dify_invocation.INVOKE_TYPE_TOOL: {
  52. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  53. return runtime.Configuration().Resource.Permission.AllowInvokeTool()
  54. },
  55. "error": "permission denied, you need to enable tool access in plugin manifest",
  56. },
  57. dify_invocation.INVOKE_TYPE_LLM: {
  58. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  59. return runtime.Configuration().Resource.Permission.AllowInvokeLLM()
  60. },
  61. "error": "permission denied, you need to enable llm access in plugin manifest",
  62. },
  63. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  64. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  65. return runtime.Configuration().Resource.Permission.AllowInvokeTextEmbedding()
  66. },
  67. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  68. },
  69. dify_invocation.INVOKE_TYPE_RERANK: {
  70. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  71. return runtime.Configuration().Resource.Permission.AllowInvokeRerank()
  72. },
  73. "error": "permission denied, you need to enable rerank access in plugin manifest",
  74. },
  75. dify_invocation.INVOKE_TYPE_TTS: {
  76. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  77. return runtime.Configuration().Resource.Permission.AllowInvokeTTS()
  78. },
  79. "error": "permission denied, you need to enable tts access in plugin manifest",
  80. },
  81. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  82. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  83. return runtime.Configuration().Resource.Permission.AllowInvokeSpeech2Text()
  84. },
  85. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  86. },
  87. dify_invocation.INVOKE_TYPE_MODERATION: {
  88. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  89. return runtime.Configuration().Resource.Permission.AllowInvokeModeration()
  90. },
  91. "error": "permission denied, you need to enable moderation access in plugin manifest",
  92. },
  93. dify_invocation.INVOKE_TYPE_NODE: {
  94. "func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
  95. return runtime.Configuration().Resource.Permission.AllowInvokeNode()
  96. },
  97. "error": "permission denied, you need to enable node access in plugin manifest",
  98. },
  99. }
  100. )
  101. func checkPermission(runtime entities.PluginRuntimeTimeLifeInterface, request_handle *BackwardsInvocation) error {
  102. permission, ok := permissionMapping[request_handle.Type()]
  103. if !ok {
  104. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  105. }
  106. permission_func, ok := permission["func"].(func(runtime entities.PluginRuntimeTimeLifeInterface) bool)
  107. if !ok {
  108. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  109. }
  110. if !permission_func(runtime) {
  111. return fmt.Errorf(permission["error"].(string))
  112. }
  113. return nil
  114. }
  115. func prepareDifyInvocationArguments(session *session_manager.Session, request map[string]any) (*BackwardsInvocation, error) {
  116. typ, ok := request["type"].(string)
  117. if !ok {
  118. return nil, fmt.Errorf("invoke request missing type: %s", request)
  119. }
  120. // get request id
  121. backwards_request_id, ok := request["backwards_request_id"].(string)
  122. if !ok {
  123. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  124. }
  125. // get request
  126. detailed_request, ok := request["request"].(map[string]any)
  127. if !ok {
  128. return nil, fmt.Errorf("invoke request missing request: %s", request)
  129. }
  130. return NewBackwardsInvocation(
  131. BackwardsInvocationType(typ),
  132. backwards_request_id, session, detailed_request,
  133. ), nil
  134. }
  135. var (
  136. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  137. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  138. genericDispatchTask[dify_invocation.InvokeToolRequest](handle, executeDifyInvocationToolTask)
  139. },
  140. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  141. genericDispatchTask[dify_invocation.InvokeLLMRequest](handle, executeDifyInvocationLLMTask)
  142. },
  143. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  144. genericDispatchTask[dify_invocation.InvokeTextEmbeddingRequest](handle, executeDifyInvocationTextEmbeddingTask)
  145. },
  146. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  147. genericDispatchTask[dify_invocation.InvokeRerankRequest](handle, executeDifyInvocationRerankTask)
  148. },
  149. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  150. genericDispatchTask[dify_invocation.InvokeTTSRequest](handle, executeDifyInvocationTTSTask)
  151. },
  152. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  153. genericDispatchTask[dify_invocation.InvokeSpeech2TextRequest](handle, executeDifyInvocationSpeech2TextTask)
  154. },
  155. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  156. genericDispatchTask[dify_invocation.InvokeModerationRequest](handle, executeDifyInvocationModerationTask)
  157. },
  158. }
  159. )
  160. func genericDispatchTask[T any](
  161. handle *BackwardsInvocation,
  162. dispatch func(
  163. handle *BackwardsInvocation,
  164. request *T,
  165. ),
  166. ) {
  167. r, err := parser.MapToStruct[T](handle.RequestData())
  168. if err != nil {
  169. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  170. return
  171. }
  172. dispatch(handle, r)
  173. }
  174. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  175. request_data := handle.RequestData()
  176. tenant_id, err := handle.TenantID()
  177. if err != nil {
  178. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  179. return
  180. }
  181. request_data["tenant_id"] = tenant_id
  182. user_id, err := handle.UserID()
  183. if err != nil {
  184. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  185. return
  186. }
  187. request_data["user_id"] = user_id
  188. typ := handle.Type()
  189. request_data["type"] = typ
  190. for t, v := range dispatchMapping {
  191. if t == handle.Type() {
  192. v(handle)
  193. return
  194. }
  195. }
  196. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  197. }
  198. func executeDifyInvocationToolTask(
  199. handle *BackwardsInvocation,
  200. request *dify_invocation.InvokeToolRequest,
  201. ) {
  202. response, err := dify_invocation.InvokeTool(request)
  203. if err != nil {
  204. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  205. return
  206. }
  207. response.Wrap(func(t tool_entities.ToolResponseChunk) {
  208. handle.WriteResponse("stream", t)
  209. })
  210. }
  211. func executeDifyInvocationLLMTask(
  212. handle *BackwardsInvocation,
  213. request *dify_invocation.InvokeLLMRequest,
  214. ) {
  215. response, err := dify_invocation.InvokeLLM(request)
  216. if err != nil {
  217. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  218. return
  219. }
  220. response.Wrap(func(t model_entities.LLMResultChunk) {
  221. handle.WriteResponse("stream", t)
  222. })
  223. }
  224. func executeDifyInvocationTextEmbeddingTask(
  225. handle *BackwardsInvocation,
  226. request *dify_invocation.InvokeTextEmbeddingRequest,
  227. ) {
  228. response, err := dify_invocation.InvokeTextEmbedding(request)
  229. if err != nil {
  230. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  231. return
  232. }
  233. handle.WriteResponse("struct", response)
  234. }
  235. func executeDifyInvocationRerankTask(
  236. handle *BackwardsInvocation,
  237. request *dify_invocation.InvokeRerankRequest,
  238. ) {
  239. response, err := dify_invocation.InvokeRerank(request)
  240. if err != nil {
  241. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  242. return
  243. }
  244. handle.WriteResponse("struct", response)
  245. }
  246. func executeDifyInvocationTTSTask(
  247. handle *BackwardsInvocation,
  248. request *dify_invocation.InvokeTTSRequest,
  249. ) {
  250. response, err := dify_invocation.InvokeTTS(request)
  251. if err != nil {
  252. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  253. return
  254. }
  255. response.Wrap(func(t model_entities.TTSResult) {
  256. handle.WriteResponse("struct", t)
  257. })
  258. }
  259. func executeDifyInvocationSpeech2TextTask(
  260. handle *BackwardsInvocation,
  261. request *dify_invocation.InvokeSpeech2TextRequest,
  262. ) {
  263. response, err := dify_invocation.InvokeSpeech2Text(request)
  264. if err != nil {
  265. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  266. return
  267. }
  268. handle.WriteResponse("struct", response)
  269. }
  270. func executeDifyInvocationModerationTask(
  271. handle *BackwardsInvocation,
  272. request *dify_invocation.InvokeModerationRequest,
  273. ) {
  274. response, err := dify_invocation.InvokeModeration(request)
  275. if err != nil {
  276. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  277. return
  278. }
  279. handle.WriteResponse("struct", response)
  280. }