response.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package stream
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/gammazero/deque"
  7. )
  8. type Stream[T any] struct {
  9. q deque.Deque[T]
  10. l *sync.Mutex
  11. sig chan bool
  12. closed int32
  13. max int
  14. listening bool
  15. onClose []func()
  16. beforeClose []func()
  17. filter []func(T) error
  18. err error
  19. }
  20. func NewStreamResponse[T any](max int) *Stream[T] {
  21. return &Stream[T]{
  22. l: &sync.Mutex{},
  23. sig: make(chan bool),
  24. max: max,
  25. }
  26. }
  27. // Filter filters the stream with a function
  28. // if the function returns an error, the stream will be closed
  29. func (r *Stream[T]) Filter(f func(T) error) {
  30. r.filter = append(r.filter, f)
  31. }
  32. // OnClose adds a function to be called when the stream is closed
  33. func (r *Stream[T]) OnClose(f func()) {
  34. r.onClose = append(r.onClose, f)
  35. }
  36. // BeforeClose adds a function to be called before the stream is closed
  37. func (r *Stream[T]) BeforeClose(f func()) {
  38. r.beforeClose = append(r.beforeClose, f)
  39. }
  40. // Next returns true if there are more data to be read
  41. // and waits for the next data to be available
  42. // returns false if the stream is closed
  43. // NOTE: even if the stream is closed, it will return true if there is data available
  44. func (r *Stream[T]) Next() bool {
  45. r.l.Lock()
  46. if r.closed == 1 && r.q.Len() == 0 && r.err == nil {
  47. r.l.Unlock()
  48. return false
  49. }
  50. if r.q.Len() > 0 || r.err != nil {
  51. r.l.Unlock()
  52. return true
  53. }
  54. r.listening = true
  55. defer func() {
  56. r.listening = false
  57. }()
  58. r.l.Unlock()
  59. return <-r.sig
  60. }
  61. // Read reads buffered data from the stream and
  62. // it returns error only if the buffer is empty or an error is written to the stream
  63. func (r *Stream[T]) Read() (T, error) {
  64. r.l.Lock()
  65. defer r.l.Unlock()
  66. if r.q.Len() > 0 {
  67. data := r.q.PopFront()
  68. for _, f := range r.filter {
  69. err := f(data)
  70. if err != nil {
  71. // close the stream
  72. r.Close()
  73. return data, err
  74. }
  75. }
  76. return data, nil
  77. } else {
  78. var data T
  79. if r.err != nil {
  80. err := r.err
  81. r.err = nil
  82. return data, err
  83. }
  84. return data, errors.New("no data available")
  85. }
  86. }
  87. // Async wraps the stream with a new stream, and allows customized operations
  88. func (r *Stream[T]) Async(fn func(T)) error {
  89. if atomic.LoadInt32(&r.closed) == 1 {
  90. return errors.New("stream is closed")
  91. }
  92. for r.Next() {
  93. data, err := r.Read()
  94. if err != nil {
  95. return err
  96. }
  97. fn(data)
  98. }
  99. return nil
  100. }
  101. // Write writes data to the stream
  102. // returns error if the buffer is full
  103. func (r *Stream[T]) Write(data T) error {
  104. if atomic.LoadInt32(&r.closed) == 1 {
  105. return nil
  106. }
  107. r.l.Lock()
  108. if r.q.Len() >= r.max {
  109. r.l.Unlock()
  110. return errors.New("queue is full")
  111. }
  112. r.q.PushBack(data)
  113. if r.q.Len() == 1 {
  114. if r.listening {
  115. r.sig <- true
  116. }
  117. }
  118. r.l.Unlock()
  119. return nil
  120. }
  121. // Close closes the stream
  122. func (r *Stream[T]) Close() {
  123. if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
  124. return
  125. }
  126. for _, f := range r.beforeClose {
  127. f()
  128. }
  129. select {
  130. case r.sig <- false:
  131. default:
  132. }
  133. close(r.sig)
  134. for _, f := range r.onClose {
  135. f()
  136. }
  137. }
  138. func (r *Stream[T]) IsClosed() bool {
  139. return atomic.LoadInt32(&r.closed) == 1
  140. }
  141. func (r *Stream[T]) Size() int {
  142. r.l.Lock()
  143. defer r.l.Unlock()
  144. return r.q.Len()
  145. }
  146. // WriteError writes an error to the stream
  147. func (r *Stream[T]) WriteError(err error) {
  148. r.l.Lock()
  149. defer r.l.Unlock()
  150. r.err = err
  151. if r.q.Len() == 0 {
  152. if r.listening {
  153. r.sig <- true
  154. }
  155. }
  156. }