task.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package backwards_invocation
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. )
  12. func InvokeDify(
  13. runtime entities.PluginRuntimeInterface,
  14. invoke_from PluginAccessType,
  15. session *session_manager.Session, data []byte,
  16. ) error {
  17. // unmarshal invoke data
  18. request, err := parser.UnmarshalJsonBytes2Map(data)
  19. if err != nil {
  20. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  21. }
  22. if request == nil {
  23. return fmt.Errorf("invoke request is empty")
  24. }
  25. // prepare invocation arguments
  26. request_handle, err := prepareDifyInvocationArguments(session, request)
  27. if err != nil {
  28. return err
  29. }
  30. if invoke_from == PLUGIN_ACCESS_TYPE_MODEL {
  31. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  32. request_handle.EndResponse()
  33. return nil
  34. }
  35. // dispatch invocation task
  36. routine.Submit(func() {
  37. dispatchDifyInvocationTask(request_handle)
  38. defer request_handle.EndResponse()
  39. })
  40. return nil
  41. }
  42. func prepareDifyInvocationArguments(session *session_manager.Session, request map[string]any) (*BackwardsInvocation, error) {
  43. typ, ok := request["type"].(string)
  44. if !ok {
  45. return nil, fmt.Errorf("invoke request missing type: %s", request)
  46. }
  47. // get request id
  48. backwards_request_id, ok := request["backwards_request_id"].(string)
  49. if !ok {
  50. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  51. }
  52. // get request
  53. detailed_request, ok := request["request"].(map[string]any)
  54. if !ok {
  55. return nil, fmt.Errorf("invoke request missing request: %s", request)
  56. }
  57. return NewBackwardsInvocation(
  58. BackwardsInvocationType(typ),
  59. backwards_request_id, session, detailed_request,
  60. ), nil
  61. }
  62. var (
  63. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  64. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  65. genericDispatchTask[dify_invocation.InvokeToolRequest](handle, executeDifyInvocationToolTask)
  66. },
  67. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  68. genericDispatchTask[dify_invocation.InvokeLLMRequest](handle, executeDifyInvocationLLMTask)
  69. },
  70. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  71. genericDispatchTask[dify_invocation.InvokeTextEmbeddingRequest](handle, executeDifyInvocationTextEmbeddingTask)
  72. },
  73. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  74. genericDispatchTask[dify_invocation.InvokeRerankRequest](handle, executeDifyInvocationRerankTask)
  75. },
  76. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  77. genericDispatchTask[dify_invocation.InvokeTTSRequest](handle, executeDifyInvocationTTSTask)
  78. },
  79. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  80. genericDispatchTask[dify_invocation.InvokeSpeech2TextRequest](handle, executeDifyInvocationSpeech2TextTask)
  81. },
  82. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  83. genericDispatchTask[dify_invocation.InvokeModerationRequest](handle, executeDifyInvocationModerationTask)
  84. },
  85. }
  86. )
  87. func genericDispatchTask[T any](
  88. handle *BackwardsInvocation,
  89. dispatch func(
  90. handle *BackwardsInvocation,
  91. request *T,
  92. ),
  93. ) {
  94. r, err := parser.MapToStruct[T](handle.RequestData())
  95. if err != nil {
  96. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  97. return
  98. }
  99. dispatch(handle, r)
  100. }
  101. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  102. request_data := handle.RequestData()
  103. tenant_id, err := handle.TenantID()
  104. if err != nil {
  105. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  106. return
  107. }
  108. request_data["tenant_id"] = tenant_id
  109. user_id, err := handle.UserID()
  110. if err != nil {
  111. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  112. return
  113. }
  114. request_data["user_id"] = user_id
  115. typ := handle.Type()
  116. request_data["type"] = typ
  117. for t, v := range dispatchMapping {
  118. if t == handle.Type() {
  119. v(handle)
  120. return
  121. }
  122. }
  123. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  124. }
  125. func executeDifyInvocationToolTask(
  126. handle *BackwardsInvocation,
  127. request *dify_invocation.InvokeToolRequest,
  128. ) {
  129. response, err := dify_invocation.InvokeTool(request)
  130. if err != nil {
  131. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  132. return
  133. }
  134. response.Wrap(func(t tool_entities.ToolResponseChunk) {
  135. handle.WriteResponse("stream", t)
  136. })
  137. }
  138. func executeDifyInvocationLLMTask(
  139. handle *BackwardsInvocation,
  140. request *dify_invocation.InvokeLLMRequest,
  141. ) {
  142. response, err := dify_invocation.InvokeLLM(request)
  143. if err != nil {
  144. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  145. return
  146. }
  147. response.Wrap(func(t model_entities.LLMResultChunk) {
  148. handle.WriteResponse("stream", t)
  149. })
  150. }
  151. func executeDifyInvocationTextEmbeddingTask(
  152. handle *BackwardsInvocation,
  153. request *dify_invocation.InvokeTextEmbeddingRequest,
  154. ) {
  155. response, err := dify_invocation.InvokeTextEmbedding(request)
  156. if err != nil {
  157. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  158. return
  159. }
  160. handle.WriteResponse("struct", response)
  161. }
  162. func executeDifyInvocationRerankTask(
  163. handle *BackwardsInvocation,
  164. request *dify_invocation.InvokeRerankRequest,
  165. ) {
  166. response, err := dify_invocation.InvokeRerank(request)
  167. if err != nil {
  168. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  169. return
  170. }
  171. handle.WriteResponse("struct", response)
  172. }
  173. func executeDifyInvocationTTSTask(
  174. handle *BackwardsInvocation,
  175. request *dify_invocation.InvokeTTSRequest,
  176. ) {
  177. response, err := dify_invocation.InvokeTTS(request)
  178. if err != nil {
  179. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  180. return
  181. }
  182. response.Wrap(func(t model_entities.TTSResult) {
  183. handle.WriteResponse("struct", t)
  184. })
  185. }
  186. func executeDifyInvocationSpeech2TextTask(
  187. handle *BackwardsInvocation,
  188. request *dify_invocation.InvokeSpeech2TextRequest,
  189. ) {
  190. response, err := dify_invocation.InvokeSpeech2Text(request)
  191. if err != nil {
  192. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  193. return
  194. }
  195. handle.WriteResponse("struct", response)
  196. }
  197. func executeDifyInvocationModerationTask(
  198. handle *BackwardsInvocation,
  199. request *dify_invocation.InvokeModerationRequest,
  200. ) {
  201. response, err := dify_invocation.InvokeModeration(request)
  202. if err != nil {
  203. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  204. return
  205. }
  206. handle.WriteResponse("struct", response)
  207. }