invoke_dify.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package plugin_daemon
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. )
  11. func invokeDify(
  12. runtime entities.PluginRuntimeInterface,
  13. session *session_manager.Session, data []byte,
  14. ) error {
  15. // unmarshal invoke data
  16. request, err := parser.UnmarshalJsonBytes[map[string]any](data)
  17. if err != nil {
  18. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  19. }
  20. typ, ok := request["type"].(string)
  21. if !ok {
  22. return fmt.Errorf("invoke request missing type: %s", data)
  23. }
  24. // get request id
  25. request_id, ok := request["request_id"].(string)
  26. if !ok {
  27. return fmt.Errorf("invoke request missing request_id: %s", data)
  28. }
  29. // get request
  30. detailed_request, ok := request["request"].(map[string]any)
  31. if !ok {
  32. return fmt.Errorf("invoke request missing request: %s", data)
  33. }
  34. switch typ {
  35. case "tool":
  36. r := dify_invocation.InvokeToolRequest{}
  37. if err := r.FromMap(request, detailed_request); err != nil {
  38. return fmt.Errorf("unmarshal tool invoke request failed: %s", err.Error())
  39. }
  40. submitToolTask(runtime, session, request_id, &r)
  41. case "model":
  42. r := dify_invocation.InvokeModelRequest{}
  43. if err := r.FromMap(request, detailed_request); err != nil {
  44. return fmt.Errorf("unmarshal model invoke request failed: %s", err.Error())
  45. }
  46. submitModelTask(runtime, session, request_id, &r)
  47. case "node":
  48. node_type, ok := detailed_request["node_type"].(dify_invocation.NodeType)
  49. if !ok {
  50. return fmt.Errorf("invoke request missing node_type: %s", data)
  51. }
  52. node_data, ok := detailed_request["data"].(map[string]any)
  53. if !ok {
  54. return fmt.Errorf("invoke request missing data: %s", data)
  55. }
  56. switch node_type {
  57. case dify_invocation.QUESTION_CLASSIFIER:
  58. d := dify_invocation.InvokeNodeRequest[*dify_invocation.QuestionClassifierNodeData]{
  59. NodeType: dify_invocation.QUESTION_CLASSIFIER,
  60. NodeData: &dify_invocation.QuestionClassifierNodeData{},
  61. }
  62. if err := d.FromMap(node_data); err != nil {
  63. return fmt.Errorf("unmarshal question classifier node data failed: %s", err.Error())
  64. }
  65. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  66. case dify_invocation.KNOWLEDGE_RETRIEVAL:
  67. d := dify_invocation.InvokeNodeRequest[*dify_invocation.KnowledgeRetrievalNodeData]{
  68. NodeType: dify_invocation.KNOWLEDGE_RETRIEVAL,
  69. NodeData: &dify_invocation.KnowledgeRetrievalNodeData{},
  70. }
  71. if err := d.FromMap(node_data); err != nil {
  72. return fmt.Errorf("unmarshal knowledge retrieval node data failed: %s", err.Error())
  73. }
  74. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  75. case dify_invocation.PARAMETER_EXTRACTOR:
  76. d := dify_invocation.InvokeNodeRequest[*dify_invocation.ParameterExtractorNodeData]{
  77. NodeType: dify_invocation.PARAMETER_EXTRACTOR,
  78. NodeData: &dify_invocation.ParameterExtractorNodeData{},
  79. }
  80. if err := d.FromMap(node_data); err != nil {
  81. return fmt.Errorf("unmarshal parameter extractor node data failed: %s", err.Error())
  82. }
  83. submitNodeInvocationRequestTask(runtime, session, request_id, &d)
  84. default:
  85. return fmt.Errorf("unknown node type: %s", node_type)
  86. }
  87. default:
  88. return fmt.Errorf("unknown invoke type: %s", typ)
  89. }
  90. return nil
  91. }
  92. func setTaskContext(session *session_manager.Session, r *dify_invocation.BaseInvokeDifyRequest) {
  93. r.TenantId = session.TenantID()
  94. r.UserId = session.UserID()
  95. }
  96. func submitModelTask(
  97. runtime entities.PluginRuntimeInterface,
  98. session *session_manager.Session,
  99. request_id string,
  100. t *dify_invocation.InvokeModelRequest,
  101. ) {
  102. setTaskContext(session, &t.BaseInvokeDifyRequest)
  103. routine.Submit(func() {
  104. response, err := dify_invocation.InvokeModel(t)
  105. if err != nil {
  106. log.Error("invoke model failed: %s", err.Error())
  107. return
  108. }
  109. defer response.Close()
  110. for response.Next() {
  111. chunk, _ := response.Read()
  112. fmt.Println(chunk)
  113. }
  114. })
  115. }
  116. func submitToolTask(
  117. runtime entities.PluginRuntimeInterface,
  118. session *session_manager.Session,
  119. request_id string,
  120. t *dify_invocation.InvokeToolRequest,
  121. ) {
  122. setTaskContext(session, &t.BaseInvokeDifyRequest)
  123. routine.Submit(func() {
  124. response, err := dify_invocation.InvokeTool(t)
  125. if err != nil {
  126. log.Error("invoke tool failed: %s", err.Error())
  127. return
  128. }
  129. defer response.Close()
  130. for response.Next() {
  131. chunk, _ := response.Read()
  132. fmt.Println(chunk)
  133. }
  134. })
  135. }
  136. func submitNodeInvocationRequestTask[W dify_invocation.WorkflowNodeData](
  137. runtime entities.PluginRuntimeInterface,
  138. session *session_manager.Session,
  139. request_id string,
  140. t *dify_invocation.InvokeNodeRequest[W],
  141. ) {
  142. setTaskContext(session, &t.BaseInvokeDifyRequest)
  143. routine.Submit(func() {
  144. response, err := dify_invocation.InvokeNode(t)
  145. if err != nil {
  146. log.Error("invoke node failed: %s", err.Error())
  147. return
  148. }
  149. fmt.Println(response)
  150. })
  151. }