agent_service.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package plugin_daemon
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "errors"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/entities/agent_entities"
  10. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  12. "github.com/langgenius/dify-plugin-daemon/pkg/entities/tool_entities"
  13. "github.com/xeipuuv/gojsonschema"
  14. )
  15. func InvokeAgentStrategy(
  16. session *session_manager.Session,
  17. r *requests.RequestInvokeAgentStrategy,
  18. ) (*stream.Stream[agent_entities.AgentStrategyResponseChunk], error) {
  19. runtime := session.Runtime()
  20. if runtime == nil {
  21. return nil, errors.New("plugin not found")
  22. }
  23. response, err := GenericInvokePlugin[
  24. requests.RequestInvokeAgentStrategy, agent_entities.AgentStrategyResponseChunk,
  25. ](
  26. session,
  27. r,
  28. 128,
  29. )
  30. if err != nil {
  31. return nil, err
  32. }
  33. agentStrategyDeclaration := runtime.Configuration().AgentStrategy
  34. if agentStrategyDeclaration == nil {
  35. return nil, errors.New("agent declaration not found")
  36. }
  37. var agentStrategyOutputSchema plugin_entities.AgentStrategyOutputSchema
  38. for _, v := range agentStrategyDeclaration.Strategies {
  39. if v.Identity.Name == r.AgentStrategy {
  40. agentStrategyOutputSchema = v.OutputSchema
  41. }
  42. }
  43. newResponse := stream.NewStream[agent_entities.AgentStrategyResponseChunk](128)
  44. routine.Submit(map[string]string{
  45. "module": "plugin_daemon",
  46. "function": "InvokeAgentStrategy",
  47. "agent_strategy_name": r.AgentStrategy,
  48. "agent_strategy_provider": r.AgentStrategyProvider,
  49. }, func() {
  50. files := make(map[string]*bytes.Buffer)
  51. defer newResponse.Close()
  52. for response.Next() {
  53. item, err := response.Read()
  54. if err != nil {
  55. newResponse.WriteError(err)
  56. return
  57. }
  58. if item.Type == tool_entities.ToolResponseChunkTypeBlobChunk {
  59. id, ok := item.Message["id"].(string)
  60. if !ok {
  61. continue
  62. }
  63. totalLength, ok := item.Message["total_length"].(float64)
  64. if !ok {
  65. continue
  66. }
  67. // convert total_length to int
  68. totalLengthInt := int(totalLength)
  69. blob, ok := item.Message["blob"].(string)
  70. if !ok {
  71. continue
  72. }
  73. end, ok := item.Message["end"].(bool)
  74. if !ok {
  75. continue
  76. }
  77. if _, ok := files[id]; !ok {
  78. files[id] = bytes.NewBuffer(make([]byte, 0, totalLengthInt))
  79. }
  80. if end {
  81. newResponse.Write(agent_entities.AgentStrategyResponseChunk{
  82. ToolResponseChunk: tool_entities.ToolResponseChunk{
  83. Type: tool_entities.ToolResponseChunkTypeBlob,
  84. Message: map[string]any{
  85. "blob": files[id].Bytes(), // bytes will be encoded to base64 finally
  86. },
  87. Meta: item.Meta,
  88. },
  89. })
  90. } else {
  91. if files[id].Len() > 15*1024*1024 {
  92. // delete the file if it is too large
  93. delete(files, id)
  94. newResponse.WriteError(errors.New("file is too large"))
  95. return
  96. } else {
  97. // decode the blob using base64
  98. decoded, err := base64.StdEncoding.DecodeString(blob)
  99. if err != nil {
  100. newResponse.WriteError(err)
  101. return
  102. }
  103. if len(decoded) > 8192 {
  104. // single chunk is too large, raises error
  105. newResponse.WriteError(errors.New("single file chunk is too large"))
  106. return
  107. }
  108. files[id].Write(decoded)
  109. }
  110. }
  111. } else {
  112. newResponse.Write(item)
  113. }
  114. }
  115. })
  116. // bind json schema validator
  117. bindAgentStrategyValidator(response, agentStrategyOutputSchema)
  118. return newResponse, nil
  119. }
  120. // TODO: reduce implementation of bindAgentValidator, it's a copy of bindToolValidator now
  121. func bindAgentStrategyValidator(
  122. response *stream.Stream[agent_entities.AgentStrategyResponseChunk],
  123. agentStrategyOutputSchema plugin_entities.AgentStrategyOutputSchema,
  124. ) {
  125. // check if the tool_output_schema is valid
  126. variables := make(map[string]any)
  127. response.Filter(func(trc agent_entities.AgentStrategyResponseChunk) error {
  128. if trc.Type == tool_entities.ToolResponseChunkTypeVariable {
  129. variableName, ok := trc.Message["variable_name"].(string)
  130. if !ok {
  131. return errors.New("variable name is not a string")
  132. }
  133. stream, ok := trc.Message["stream"].(bool)
  134. if !ok {
  135. return errors.New("stream is not a boolean")
  136. }
  137. if stream {
  138. // ensure variable_value is a string
  139. variableValue, ok := trc.Message["variable_value"].(string)
  140. if !ok {
  141. return errors.New("variable value is not a string")
  142. }
  143. // create it if not exists
  144. if _, ok := variables[variableName]; !ok {
  145. variables[variableName] = ""
  146. }
  147. originalValue, ok := variables[variableName].(string)
  148. if !ok {
  149. return errors.New("variable value is not a string")
  150. }
  151. // add the variable value to the variable
  152. variables[variableName] = originalValue + variableValue
  153. } else {
  154. variables[variableName] = trc.Message["variable_value"]
  155. }
  156. }
  157. return nil
  158. })
  159. response.BeforeClose(func() {
  160. // validate the variables
  161. schema, err := gojsonschema.NewSchema(gojsonschema.NewGoLoader(agentStrategyOutputSchema))
  162. if err != nil {
  163. response.WriteError(err)
  164. return
  165. }
  166. // validate the variables
  167. result, err := schema.Validate(gojsonschema.NewGoLoader(variables))
  168. if err != nil {
  169. response.WriteError(err)
  170. return
  171. }
  172. if !result.Valid() {
  173. response.WriteError(errors.New("tool output schema is not valid"))
  174. return
  175. }
  176. })
  177. }