response.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package stream
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/gammazero/deque"
  7. )
  8. type Stream[T any] struct {
  9. q deque.Deque[T]
  10. l *sync.Mutex
  11. sig chan bool
  12. closed int32
  13. max int
  14. listening bool
  15. onClose []func()
  16. beforeClose []func()
  17. filter []func(T) error
  18. err error
  19. }
  20. func NewStream[T any](max int) *Stream[T] {
  21. return &Stream[T]{
  22. l: &sync.Mutex{},
  23. sig: make(chan bool),
  24. max: max,
  25. }
  26. }
  27. // Filter filters the stream with a function
  28. // if the function returns an error, the stream will be closed
  29. func (r *Stream[T]) Filter(f func(T) error) {
  30. r.filter = append(r.filter, f)
  31. }
  32. // OnClose adds a function to be called when the stream is closed
  33. func (r *Stream[T]) OnClose(f func()) {
  34. r.onClose = append(r.onClose, f)
  35. }
  36. // BeforeClose adds a function to be called before the stream is closed
  37. func (r *Stream[T]) BeforeClose(f func()) {
  38. r.beforeClose = append(r.beforeClose, f)
  39. }
  40. // Next returns true if there are more data to be read
  41. // and waits for the next data to be available
  42. // returns false if the stream is closed
  43. // NOTE: even if the stream is closed, it will return true if there is data available
  44. func (r *Stream[T]) Next() bool {
  45. r.l.Lock()
  46. if r.closed == 1 && r.q.Len() == 0 && r.err == nil {
  47. r.l.Unlock()
  48. return false
  49. }
  50. if r.q.Len() > 0 || r.err != nil {
  51. r.l.Unlock()
  52. return true
  53. }
  54. r.listening = true
  55. defer func() {
  56. r.listening = false
  57. }()
  58. r.l.Unlock()
  59. return <-r.sig
  60. }
  61. // Read reads buffered data from the stream and
  62. // it returns error only if the buffer is empty or an error is written to the stream
  63. func (r *Stream[T]) Read() (T, error) {
  64. r.l.Lock()
  65. defer r.l.Unlock()
  66. if r.q.Len() > 0 {
  67. data := r.q.PopFront()
  68. for _, f := range r.filter {
  69. err := f(data)
  70. if err != nil {
  71. // close the stream
  72. r.Close()
  73. return data, err
  74. }
  75. }
  76. return data, nil
  77. } else {
  78. var data T
  79. if r.err != nil {
  80. err := r.err
  81. r.err = nil
  82. return data, err
  83. }
  84. return data, errors.New("no data available")
  85. }
  86. }
  87. // Async wraps the stream with a new stream, and allows customized operations
  88. func (r *Stream[T]) Async(fn func(T)) error {
  89. for r.Next() {
  90. data, err := r.Read()
  91. if err != nil {
  92. return err
  93. }
  94. fn(data)
  95. }
  96. return nil
  97. }
  98. // Write writes data to the stream,
  99. // returns error if the buffer is full
  100. func (r *Stream[T]) Write(data T) error {
  101. if atomic.LoadInt32(&r.closed) == 1 {
  102. return nil
  103. }
  104. r.l.Lock()
  105. if r.q.Len() >= r.max {
  106. r.l.Unlock()
  107. return errors.New("queue is full")
  108. }
  109. r.q.PushBack(data)
  110. if r.q.Len() == 1 {
  111. if r.listening {
  112. r.sig <- true
  113. }
  114. }
  115. r.l.Unlock()
  116. return nil
  117. }
  118. // Close closes the stream
  119. func (r *Stream[T]) Close() {
  120. if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
  121. return
  122. }
  123. for _, f := range r.beforeClose {
  124. f()
  125. }
  126. select {
  127. case r.sig <- false:
  128. default:
  129. }
  130. close(r.sig)
  131. for _, f := range r.onClose {
  132. f()
  133. }
  134. }
  135. func (r *Stream[T]) IsClosed() bool {
  136. return atomic.LoadInt32(&r.closed) == 1
  137. }
  138. func (r *Stream[T]) Size() int {
  139. r.l.Lock()
  140. defer r.l.Unlock()
  141. return r.q.Len()
  142. }
  143. // WriteError writes an error to the stream
  144. func (r *Stream[T]) WriteError(err error) {
  145. r.l.Lock()
  146. defer r.l.Unlock()
  147. r.err = err
  148. if r.q.Len() == 0 {
  149. if r.listening {
  150. r.sig <- true
  151. }
  152. }
  153. }