invoke_dify.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package plugin_daemon
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. )
  12. func invokeDify(
  13. runtime entities.PluginRuntimeInterface,
  14. invoke_from PluginAccessType,
  15. session *session_manager.Session, data []byte,
  16. ) error {
  17. // unmarshal invoke data
  18. request, err := parser.UnmarshalJsonBytes[map[string]any](data)
  19. if err != nil {
  20. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  21. }
  22. // prepare invocation arguments
  23. request_handle, err := prepareDifyInvocationArguments(session, request)
  24. if err != nil {
  25. return err
  26. }
  27. defer request_handle.End()
  28. if invoke_from == PLUGIN_ACCESS_TYPE_MODEL {
  29. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  30. return nil
  31. }
  32. // dispatch invocation task
  33. dispatchDifyInvocationTask(request_handle)
  34. return nil
  35. }
  36. func prepareDifyInvocationArguments(session *session_manager.Session, request map[string]any) (*backwards_invocation.BackwardsInvocation, error) {
  37. typ, ok := request["type"].(string)
  38. if !ok {
  39. return nil, fmt.Errorf("invoke request missing type: %s", request)
  40. }
  41. // get request id
  42. backwards_request_id, ok := request["backwards_request_id"].(string)
  43. if !ok {
  44. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  45. }
  46. // get request
  47. detailed_request, ok := request["request"].(map[string]any)
  48. if !ok {
  49. return nil, fmt.Errorf("invoke request missing request: %s", request)
  50. }
  51. return backwards_invocation.NewBackwardsInvocation(
  52. backwards_invocation.BackwardsInvocationType(typ),
  53. backwards_request_id, session, detailed_request,
  54. ), nil
  55. }
  56. func dispatchDifyInvocationTask(handle *backwards_invocation.BackwardsInvocation) {
  57. switch handle.Type() {
  58. case dify_invocation.INVOKE_TYPE_TOOL:
  59. r, err := parser.MapToStruct[dify_invocation.InvokeToolRequest](handle.RequestData())
  60. if err != nil {
  61. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  62. return
  63. }
  64. submitToolTask(runtime, session, backwards_request_id, &r)
  65. case dify_invocation.INVOKE_TYPE_MODEL:
  66. r, err := parser.MapToStruct[dify_invocation.InvokeModelRequest](handle.RequestData())
  67. if err != nil {
  68. handle.WriteError(fmt.Errorf("unmarshal invoke model request failed: %s", err.Error()))
  69. return
  70. }
  71. submitModelTask(runtime, session, backwards_request_id, &r)
  72. case dify_invocation.INVOKE_TYPE_NODE:
  73. node_type, ok := detailed_request["node_type"].(dify_invocation.NodeType)
  74. if !ok {
  75. return fmt.Errorf("invoke request missing node_type: %s", data)
  76. }
  77. node_data, ok := detailed_request["data"].(map[string]any)
  78. if !ok {
  79. return fmt.Errorf("invoke request missing data: %s", data)
  80. }
  81. switch node_type {
  82. case dify_invocation.QUESTION_CLASSIFIER:
  83. d := dify_invocation.InvokeNodeRequest[dify_invocation.QuestionClassifierNodeData]{
  84. NodeType: dify_invocation.QUESTION_CLASSIFIER,
  85. }
  86. if err := d.FromMap(node_data); err != nil {
  87. return fmt.Errorf("unmarshal question classifier node data failed: %s", err.Error())
  88. }
  89. submitNodeInvocationRequestTask(runtime, session, backwards_request_id, &d)
  90. case dify_invocation.KNOWLEDGE_RETRIEVAL:
  91. d := dify_invocation.InvokeNodeRequest[dify_invocation.KnowledgeRetrievalNodeData]{
  92. NodeType: dify_invocation.KNOWLEDGE_RETRIEVAL,
  93. }
  94. if err := d.FromMap(node_data); err != nil {
  95. return fmt.Errorf("unmarshal knowledge retrieval node data failed: %s", err.Error())
  96. }
  97. submitNodeInvocationRequestTask(runtime, session, backwards_request_id, &d)
  98. case dify_invocation.PARAMETER_EXTRACTOR:
  99. d := dify_invocation.InvokeNodeRequest[dify_invocation.ParameterExtractorNodeData]{
  100. NodeType: dify_invocation.PARAMETER_EXTRACTOR,
  101. }
  102. if err := d.FromMap(node_data); err != nil {
  103. return fmt.Errorf("unmarshal parameter extractor node data failed: %s", err.Error())
  104. }
  105. submitNodeInvocationRequestTask(runtime, session, backwards_request_id, &d)
  106. default:
  107. return fmt.Errorf("unknown node type: %s", node_type)
  108. }
  109. default:
  110. return fmt.Errorf("unknown invoke type: %s", typ)
  111. }
  112. }
  113. func setTaskContext(session *session_manager.Session, r *dify_invocation.BaseInvokeDifyRequest) {
  114. r.TenantId = session.TenantID()
  115. r.UserId = session.UserID()
  116. }
  117. func submitModelTask(
  118. runtime entities.PluginRuntimeInterface,
  119. session *session_manager.Session,
  120. request_id string,
  121. t *dify_invocation.InvokeModelRequest,
  122. ) {
  123. setTaskContext(session, &t.BaseInvokeDifyRequest)
  124. routine.Submit(func() {
  125. response, err := dify_invocation.InvokeModel(t)
  126. if err != nil {
  127. log.Error("invoke model failed: %s", err.Error())
  128. return
  129. }
  130. defer response.Close()
  131. for response.Next() {
  132. chunk, _ := response.Read()
  133. fmt.Println(chunk)
  134. }
  135. })
  136. }
  137. func submitToolTask(
  138. runtime entities.PluginRuntimeInterface,
  139. session *session_manager.Session,
  140. request_id string,
  141. t *dify_invocation.InvokeToolRequest,
  142. ) {
  143. setTaskContext(session, &t.BaseInvokeDifyRequest)
  144. routine.Submit(func() {
  145. response, err := dify_invocation.InvokeTool(t)
  146. if err != nil {
  147. log.Error("invoke tool failed: %s", err.Error())
  148. return
  149. }
  150. defer response.Close()
  151. for response.Next() {
  152. chunk, _ := response.Read()
  153. fmt.Println(chunk)
  154. }
  155. })
  156. }
  157. func submitNodeInvocationRequestTask[W dify_invocation.WorkflowNodeData](
  158. runtime entities.PluginRuntimeInterface,
  159. session *session_manager.Session,
  160. request_id string,
  161. t *dify_invocation.InvokeNodeRequest[W],
  162. ) {
  163. setTaskContext(session, &t.BaseInvokeDifyRequest)
  164. routine.Submit(func() {
  165. response, err := dify_invocation.InvokeNode(t)
  166. if err != nil {
  167. log.Error("invoke node failed: %s", err.Error())
  168. return
  169. }
  170. fmt.Println(response)
  171. })
  172. }