io.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package stdio_holder
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/google/uuid"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. )
  12. var (
  13. stdio_holder sync.Map = sync.Map{}
  14. l *sync.Mutex = &sync.Mutex{}
  15. listeners map[string]func(string, []byte) = map[string]func(string, []byte){}
  16. )
  17. type stdioHolder struct {
  18. id string
  19. pluginIdentity string
  20. writer io.WriteCloser
  21. reader io.ReadCloser
  22. errReader io.ReadCloser
  23. l *sync.Mutex
  24. listener map[string]func([]byte)
  25. started bool
  26. alive bool
  27. }
  28. func (s *stdioHolder) Stop() {
  29. s.alive = false
  30. s.writer.Close()
  31. s.reader.Close()
  32. s.errReader.Close()
  33. stdio_holder.Delete(s.id)
  34. }
  35. func (s *stdioHolder) StartStdout() {
  36. s.started = true
  37. s.alive = true
  38. scanner := bufio.NewScanner(s.reader)
  39. for s.alive {
  40. for scanner.Scan() {
  41. data := scanner.Bytes()
  42. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  43. if err != nil {
  44. log.Error("unmarshal json failed: %s", err.Error())
  45. continue
  46. }
  47. session_id := event.SessionId
  48. switch event.Event {
  49. case plugin_entities.PLUGIN_EVENT_LOG:
  50. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  51. logEvent, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](event.Data)
  52. if err != nil {
  53. log.Error("unmarshal json failed: %s", err.Error())
  54. continue
  55. }
  56. log.Info("plugin %s: %s", s.pluginIdentity, logEvent.Message)
  57. }
  58. case plugin_entities.PLUGIN_EVENT_SESSION:
  59. for _, listener := range listeners {
  60. listener(s.id, event.Data)
  61. }
  62. for listener_session_id, listener := range s.listener {
  63. if listener_session_id == session_id {
  64. listener(event.Data)
  65. }
  66. }
  67. case plugin_entities.PLUGIN_EVENT_ERROR:
  68. log.Error("plugin %s: %s", s.pluginIdentity, event.Data)
  69. }
  70. }
  71. }
  72. }
  73. /*
  74. * @return error
  75. */
  76. func (s *stdioHolder) StartStderr() error {
  77. s.started = true
  78. s.alive = true
  79. defer s.Stop()
  80. for s.alive {
  81. buf := make([]byte, 1024)
  82. n, err := s.errReader.Read(buf)
  83. if err != nil && err != io.EOF {
  84. return err
  85. } else if err != nil {
  86. return nil
  87. }
  88. if n > 0 {
  89. return fmt.Errorf("stderr: %s", buf[:n])
  90. }
  91. }
  92. return nil
  93. }
  94. func (s *stdioHolder) GetID() string {
  95. return s.id
  96. }
  97. /*
  98. * @param plugin_identity: string
  99. * @param writer: io.WriteCloser
  100. * @param reader: io.ReadCloser
  101. * @param errReader: io.ReadCloser
  102. */
  103. func Put(
  104. plugin_identity string,
  105. writer io.WriteCloser,
  106. reader io.ReadCloser,
  107. errReader io.ReadCloser,
  108. ) *stdioHolder {
  109. id := uuid.New().String()
  110. holder := &stdioHolder{
  111. pluginIdentity: plugin_identity,
  112. writer: writer,
  113. reader: reader,
  114. errReader: errReader,
  115. id: id,
  116. l: &sync.Mutex{},
  117. }
  118. stdio_holder.Store(id, holder)
  119. return holder
  120. }
  121. /*
  122. * @param id: string
  123. */
  124. func Get(id string) *stdioHolder {
  125. if v, ok := stdio_holder.Load(id); ok {
  126. if holder, ok := v.(*stdioHolder); ok {
  127. return holder
  128. }
  129. }
  130. return nil
  131. }
  132. /*
  133. * @param id: string
  134. */
  135. func Remove(id string) {
  136. stdio_holder.Delete(id)
  137. }
  138. /*
  139. * @param id: string
  140. * @param session_id: string
  141. * @param listener: func(data []byte)
  142. * @return string - listener identity
  143. */
  144. func OnEvent(id string, session_id string, listener func([]byte)) {
  145. if v, ok := stdio_holder.Load(id); ok {
  146. if holder, ok := v.(*stdioHolder); ok {
  147. holder.l.Lock()
  148. defer holder.l.Unlock()
  149. if holder.listener == nil {
  150. holder.listener = map[string]func([]byte){}
  151. }
  152. holder.listener[session_id] = listener
  153. }
  154. }
  155. }
  156. /*
  157. * @param id: string
  158. * @param listener: string
  159. */
  160. func RemoveListener(id string, listener string) {
  161. if v, ok := stdio_holder.Load(id); ok {
  162. if holder, ok := v.(*stdioHolder); ok {
  163. holder.l.Lock()
  164. defer holder.l.Unlock()
  165. delete(holder.listener, listener)
  166. }
  167. }
  168. }
  169. /*
  170. * @param listener: func(id string, data []byte)
  171. */
  172. func OnGlobalEvent(listener func(string, []byte)) {
  173. l.Lock()
  174. defer l.Unlock()
  175. listeners[uuid.New().String()] = listener
  176. }
  177. /*
  178. * @param id: string
  179. * @param data: []byte
  180. */
  181. func Write(id string, data []byte) error {
  182. if v, ok := stdio_holder.Load(id); ok {
  183. if holder, ok := v.(*stdioHolder); ok {
  184. _, err := holder.writer.Write(data)
  185. return err
  186. }
  187. }
  188. return nil
  189. }