123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package stream
- import (
- "errors"
- "sync"
- "sync/atomic"
- "github.com/gammazero/deque"
- )
- type Stream[T any] struct {
- q deque.Deque[T]
- l *sync.Mutex
- sig chan bool
- closed int32
- max int
- listening bool
- onClose func()
- err error
- }
- func NewStreamResponse[T any](max int) *Stream[T] {
- return &Stream[T]{
- l: &sync.Mutex{},
- sig: make(chan bool),
- max: max,
- }
- }
- func (r *Stream[T]) OnClose(f func()) {
- r.onClose = f
- }
- // Next returns true if there are more data to be read
- // and waits for the next data to be available
- // returns false if the stream is closed
- // NOTE: even if the stream is closed, it will return true if there is data available
- func (r *Stream[T]) Next() bool {
- r.l.Lock()
- if r.closed == 1 && r.q.Len() == 0 && r.err == nil {
- r.l.Unlock()
- return false
- }
- if r.q.Len() > 0 || r.err != nil {
- r.l.Unlock()
- return true
- }
- r.listening = true
- defer func() {
- r.listening = false
- }()
- r.l.Unlock()
- return <-r.sig
- }
- // Read reads buffered data from the stream and
- // it returns error only if the buffer is empty or an error is written to the stream
- func (r *Stream[T]) Read() (T, error) {
- r.l.Lock()
- defer r.l.Unlock()
- if r.q.Len() > 0 {
- data := r.q.PopFront()
- return data, nil
- } else {
- var data T
- if r.err != nil {
- err := r.err
- r.err = nil
- return data, err
- }
- return data, errors.New("no data available")
- }
- }
- // Wrap wraps the stream with a new stream, and allows customized operations
- func (r *Stream[T]) Wrap(fn func(T)) error {
- if atomic.LoadInt32(&r.closed) == 1 {
- return errors.New("stream is closed")
- }
- for r.Next() {
- data, err := r.Read()
- if err != nil {
- return err
- }
- fn(data)
- }
- return nil
- }
- // Write writes data to the stream
- // returns error if the buffer is full
- func (r *Stream[T]) Write(data T) error {
- if atomic.LoadInt32(&r.closed) == 1 {
- return nil
- }
- r.l.Lock()
- if r.q.Len() >= r.max {
- r.l.Unlock()
- return errors.New("queue is full")
- }
- r.q.PushBack(data)
- if r.q.Len() == 1 {
- if r.listening {
- r.sig <- true
- }
- }
- r.l.Unlock()
- return nil
- }
- // Close closes the stream
- func (r *Stream[T]) Close() {
- if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
- return
- }
- select {
- case r.sig <- false:
- default:
- }
- close(r.sig)
- if r.onClose != nil {
- r.onClose()
- }
- }
- func (r *Stream[T]) IsClosed() bool {
- return atomic.LoadInt32(&r.closed) == 1
- }
- func (r *Stream[T]) Size() int {
- r.l.Lock()
- defer r.l.Unlock()
- return r.q.Len()
- }
- // WriteError writes an error to the stream
- func (r *Stream[T]) WriteError(err error) {
- r.l.Lock()
- defer r.l.Unlock()
- r.err = err
- if r.q.Len() == 0 {
- if r.listening {
- r.sig <- true
- }
- }
- }
|