invoke_dify.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package plugin_daemon
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. )
  11. func invokeDify(runtime entities.PluginRuntimeInterface,
  12. session *session_manager.Session, data []byte,
  13. ) error {
  14. // unmarshal invoke data
  15. request, err := parser.UnmarshalJsonBytes[map[string]any](data)
  16. if err != nil {
  17. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  18. }
  19. typ, ok := request["type"].(string)
  20. if !ok {
  21. return fmt.Errorf("invoke request missing type: %s", data)
  22. }
  23. // get request id
  24. request_id, ok := request["request_id"].(string)
  25. if !ok {
  26. return fmt.Errorf("invoke request missing request_id: %s", data)
  27. }
  28. // get request
  29. detailed_request, ok := request["request"].(map[string]any)
  30. if !ok {
  31. return fmt.Errorf("invoke request missing request: %s", data)
  32. }
  33. switch typ {
  34. case "tool":
  35. r := dify_invocation.InvokeToolRequest{}
  36. if err := r.FromMap(request, detailed_request); err != nil {
  37. return fmt.Errorf("unmarshal tool invoke request failed: %s", err.Error())
  38. }
  39. submitToolTask(runtime, session, request_id, &r)
  40. case "model":
  41. r := dify_invocation.InvokeModelRequest{}
  42. if err := r.FromMap(request, detailed_request); err != nil {
  43. return fmt.Errorf("unmarshal model invoke request failed: %s", err.Error())
  44. }
  45. submitModelTask(runtime, session, request_id, &r)
  46. case "node":
  47. node_type, ok := detailed_request["node_type"].(dify_invocation.NodeType)
  48. if !ok {
  49. return fmt.Errorf("invoke request missing node_type: %s", data)
  50. }
  51. node_data, ok := detailed_request["data"].(map[string]any)
  52. if !ok {
  53. return fmt.Errorf("invoke request missing data: %s", data)
  54. }
  55. switch node_type {
  56. case dify_invocation.QUESTION_CLASSIFIER:
  57. d := dify_invocation.InvokeNodeRequest[*dify_invocation.QuestionClassifierNodeData]{
  58. NodeType: dify_invocation.QUESTION_CLASSIFIER,
  59. NodeData: &dify_invocation.QuestionClassifierNodeData{},
  60. }
  61. if err := d.FromMap(node_data); err != nil {
  62. return fmt.Errorf("unmarshal question classifier node data failed: %s", err.Error())
  63. }
  64. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  65. case dify_invocation.KNOWLEDGE_RETRIEVAL:
  66. d := dify_invocation.InvokeNodeRequest[*dify_invocation.KnowledgeRetrievalNodeData]{
  67. NodeType: dify_invocation.KNOWLEDGE_RETRIEVAL,
  68. NodeData: &dify_invocation.KnowledgeRetrievalNodeData{},
  69. }
  70. if err := d.FromMap(node_data); err != nil {
  71. return fmt.Errorf("unmarshal knowledge retrieval node data failed: %s", err.Error())
  72. }
  73. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  74. case dify_invocation.PARAMETER_EXTRACTOR:
  75. d := dify_invocation.InvokeNodeRequest[*dify_invocation.ParameterExtractorNodeData]{
  76. NodeType: dify_invocation.PARAMETER_EXTRACTOR,
  77. NodeData: &dify_invocation.ParameterExtractorNodeData{},
  78. }
  79. if err := d.FromMap(node_data); err != nil {
  80. return fmt.Errorf("unmarshal parameter extractor node data failed: %s", err.Error())
  81. }
  82. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  83. case dify_invocation.CODE:
  84. d := dify_invocation.InvokeNodeRequest[*dify_invocation.CodeNodeData]{
  85. NodeType: dify_invocation.CODE,
  86. NodeData: &dify_invocation.CodeNodeData{},
  87. }
  88. if err := d.FromMap(node_data); err != nil {
  89. return fmt.Errorf("unmarshal code node data failed: %s", err.Error())
  90. }
  91. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  92. default:
  93. return fmt.Errorf("unknown node type: %s", node_type)
  94. }
  95. default:
  96. return fmt.Errorf("unknown invoke type: %s", typ)
  97. }
  98. return nil
  99. }
  100. func setTaskContext(session *session_manager.Session, r *dify_invocation.BaseInvokeDifyRequest) {
  101. r.TenantId = session.TenantID()
  102. r.UserId = session.UserID()
  103. }
  104. func submitModelTask(
  105. runtime entities.PluginRuntimeInterface,
  106. session *session_manager.Session,
  107. request_id string,
  108. t *dify_invocation.InvokeModelRequest,
  109. ) {
  110. setTaskContext(session, &t.BaseInvokeDifyRequest)
  111. routine.Submit(func() {
  112. response, err := dify_invocation.InvokeModel(t)
  113. if err != nil {
  114. log.Error("invoke model failed: %s", err.Error())
  115. return
  116. }
  117. defer response.Close()
  118. for response.Next() {
  119. chunk, _ := response.Read()
  120. fmt.Println(chunk)
  121. }
  122. })
  123. }
  124. func submitToolTask(
  125. runtime entities.PluginRuntimeInterface,
  126. session *session_manager.Session,
  127. request_id string,
  128. t *dify_invocation.InvokeToolRequest,
  129. ) {
  130. setTaskContext(session, &t.BaseInvokeDifyRequest)
  131. routine.Submit(func() {
  132. response, err := dify_invocation.InvokeTool(t)
  133. if err != nil {
  134. log.Error("invoke tool failed: %s", err.Error())
  135. return
  136. }
  137. defer response.Close()
  138. for response.Next() {
  139. chunk, _ := response.Read()
  140. fmt.Println(chunk)
  141. }
  142. })
  143. }
  144. func submitNodeInvocationRequestTask[W dify_invocation.WorkflowNodeData](
  145. runtime entities.PluginRuntimeInterface,
  146. session *session_manager.Session,
  147. request_id string,
  148. t *dify_invocation.InvokeNodeRequest[W],
  149. ) {
  150. setTaskContext(session, &t.BaseInvokeDifyRequest)
  151. routine.Submit(func() {
  152. response, err := dify_invocation.InvokeNode(t)
  153. if err != nil {
  154. log.Error("invoke node failed: %s", err.Error())
  155. return
  156. }
  157. fmt.Println(response)
  158. })
  159. }