io.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package stdio_holder
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/google/uuid"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. )
  12. var (
  13. stdio_holder sync.Map = sync.Map{}
  14. l *sync.Mutex = &sync.Mutex{}
  15. listeners map[string]func(string, []byte) = map[string]func(string, []byte){}
  16. )
  17. type stdioHolder struct {
  18. id string
  19. pluginIdentity string
  20. writer io.WriteCloser
  21. reader io.ReadCloser
  22. errReader io.ReadCloser
  23. l *sync.Mutex
  24. listener map[string]func([]byte)
  25. started bool
  26. alive bool
  27. }
  28. func (s *stdioHolder) Stop() {
  29. s.alive = false
  30. s.writer.Close()
  31. s.reader.Close()
  32. s.errReader.Close()
  33. stdio_holder.Delete(s.id)
  34. }
  35. func (s *stdioHolder) StartStdout() {
  36. s.started = true
  37. s.alive = true
  38. scanner := bufio.NewScanner(s.reader)
  39. for s.alive {
  40. for scanner.Scan() {
  41. data := scanner.Bytes()
  42. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  43. if err != nil {
  44. log.Error("unmarshal json failed: %s", err.Error())
  45. continue
  46. }
  47. session_id := event.SessionId
  48. switch event.Event {
  49. case plugin_entities.PLUGIN_EVENT_LOG:
  50. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  51. logEvent, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](event.Data)
  52. if err != nil {
  53. log.Error("unmarshal json failed: %s", err.Error())
  54. continue
  55. }
  56. log.Info("plugin %s: %s", s.pluginIdentity, logEvent.Message)
  57. }
  58. case plugin_entities.PLUGIN_EVENT_RESPONSE:
  59. for _, listener := range listeners {
  60. listener(s.id, event.Data)
  61. }
  62. for listener_session_id, listener := range s.listener {
  63. if listener_session_id == session_id {
  64. listener(event.Data)
  65. }
  66. }
  67. case plugin_entities.PLUGIN_EVENT_ERROR:
  68. log.Error("plugin %s: %s", s.pluginIdentity, event.Data)
  69. case plugin_entities.PLUGIN_EVENT_INVOKE:
  70. // invoke dify
  71. }
  72. }
  73. }
  74. }
  75. /*
  76. * @return error
  77. */
  78. func (s *stdioHolder) StartStderr() error {
  79. s.started = true
  80. s.alive = true
  81. defer s.Stop()
  82. for s.alive {
  83. buf := make([]byte, 1024)
  84. n, err := s.errReader.Read(buf)
  85. if err != nil && err != io.EOF {
  86. return err
  87. } else if err != nil {
  88. return nil
  89. }
  90. if n > 0 {
  91. return fmt.Errorf("stderr: %s", buf[:n])
  92. }
  93. }
  94. return nil
  95. }
  96. func (s *stdioHolder) GetID() string {
  97. return s.id
  98. }
  99. /*
  100. * @param plugin_identity: string
  101. * @param writer: io.WriteCloser
  102. * @param reader: io.ReadCloser
  103. * @param errReader: io.ReadCloser
  104. */
  105. func Put(
  106. plugin_identity string,
  107. writer io.WriteCloser,
  108. reader io.ReadCloser,
  109. errReader io.ReadCloser,
  110. ) *stdioHolder {
  111. id := uuid.New().String()
  112. holder := &stdioHolder{
  113. pluginIdentity: plugin_identity,
  114. writer: writer,
  115. reader: reader,
  116. errReader: errReader,
  117. id: id,
  118. l: &sync.Mutex{},
  119. }
  120. stdio_holder.Store(id, holder)
  121. return holder
  122. }
  123. /*
  124. * @param id: string
  125. */
  126. func Get(id string) *stdioHolder {
  127. if v, ok := stdio_holder.Load(id); ok {
  128. if holder, ok := v.(*stdioHolder); ok {
  129. return holder
  130. }
  131. }
  132. return nil
  133. }
  134. /*
  135. * @param id: string
  136. */
  137. func Remove(id string) {
  138. stdio_holder.Delete(id)
  139. }
  140. /*
  141. * @param id: string
  142. * @param session_id: string
  143. * @param listener: func(data []byte)
  144. * @return string - listener identity
  145. */
  146. func OnEvent(id string, session_id string, listener func([]byte)) {
  147. if v, ok := stdio_holder.Load(id); ok {
  148. if holder, ok := v.(*stdioHolder); ok {
  149. holder.l.Lock()
  150. defer holder.l.Unlock()
  151. if holder.listener == nil {
  152. holder.listener = map[string]func([]byte){}
  153. }
  154. holder.listener[session_id] = listener
  155. }
  156. }
  157. }
  158. /*
  159. * @param id: string
  160. * @param listener: string
  161. */
  162. func RemoveListener(id string, listener string) {
  163. if v, ok := stdio_holder.Load(id); ok {
  164. if holder, ok := v.(*stdioHolder); ok {
  165. holder.l.Lock()
  166. defer holder.l.Unlock()
  167. delete(holder.listener, listener)
  168. }
  169. }
  170. }
  171. /*
  172. * @param listener: func(id string, data []byte)
  173. */
  174. func OnGlobalEvent(listener func(string, []byte)) {
  175. l.Lock()
  176. defer l.Unlock()
  177. listeners[uuid.New().String()] = listener
  178. }
  179. /*
  180. * @param id: string
  181. * @param data: []byte
  182. */
  183. func Write(id string, data []byte) error {
  184. if v, ok := stdio_holder.Load(id); ok {
  185. if holder, ok := v.(*stdioHolder); ok {
  186. _, err := holder.writer.Write(data)
  187. return err
  188. }
  189. }
  190. return nil
  191. }