123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package stream
- import (
- "errors"
- "sync"
- "sync/atomic"
- "github.com/gammazero/deque"
- )
- type Stream[T any] struct {
- q deque.Deque[T]
- l *sync.Mutex
- sig chan bool
- closed int32
- max int
- listening bool
- onClose []func()
- beforeClose []func()
- filter []func(T) error
- err error
- }
- func NewStream[T any](max int) *Stream[T] {
- return &Stream[T]{
- l: &sync.Mutex{},
- sig: make(chan bool),
- max: max,
- }
- }
- // Filter filters the stream with a function
- // if the function returns an error, the stream will be closed
- func (r *Stream[T]) Filter(f func(T) error) {
- r.filter = append(r.filter, f)
- }
- // OnClose adds a function to be called when the stream is closed
- func (r *Stream[T]) OnClose(f func()) {
- r.onClose = append(r.onClose, f)
- }
- // BeforeClose adds a function to be called before the stream is closed
- func (r *Stream[T]) BeforeClose(f func()) {
- r.beforeClose = append(r.beforeClose, f)
- }
- // Next returns true if there are more data to be read
- // and waits for the next data to be available
- // returns false if the stream is closed
- // NOTE: even if the stream is closed, it will return true if there is data available
- func (r *Stream[T]) Next() bool {
- r.l.Lock()
- if r.closed == 1 && r.q.Len() == 0 && r.err == nil {
- r.l.Unlock()
- return false
- }
- if r.q.Len() > 0 || r.err != nil {
- r.l.Unlock()
- return true
- }
- r.listening = true
- defer func() {
- r.listening = false
- }()
- r.l.Unlock()
- return <-r.sig
- }
- // Read reads buffered data from the stream and
- // it returns error only if the buffer is empty or an error is written to the stream
- func (r *Stream[T]) Read() (T, error) {
- r.l.Lock()
- defer r.l.Unlock()
- if r.q.Len() > 0 {
- data := r.q.PopFront()
- for _, f := range r.filter {
- err := f(data)
- if err != nil {
- // close the stream
- r.Close()
- return data, err
- }
- }
- return data, nil
- } else {
- var data T
- if r.err != nil {
- err := r.err
- r.err = nil
- return data, err
- }
- return data, errors.New("no data available")
- }
- }
- // Async wraps the stream with a new stream, and allows customized operations
- func (r *Stream[T]) Async(fn func(T)) error {
- for r.Next() {
- data, err := r.Read()
- if err != nil {
- return err
- }
- fn(data)
- }
- return nil
- }
- // Write writes data to the stream,
- // returns error if the buffer is full
- func (r *Stream[T]) Write(data T) error {
- if atomic.LoadInt32(&r.closed) == 1 {
- return nil
- }
- r.l.Lock()
- if r.q.Len() >= r.max {
- r.l.Unlock()
- return errors.New("queue is full")
- }
- r.q.PushBack(data)
- if r.q.Len() == 1 {
- if r.listening {
- r.sig <- true
- }
- }
- r.l.Unlock()
- return nil
- }
- // Close closes the stream
- func (r *Stream[T]) Close() {
- if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
- return
- }
- for _, f := range r.beforeClose {
- f()
- }
- select {
- case r.sig <- false:
- default:
- }
- close(r.sig)
- for _, f := range r.onClose {
- f()
- }
- }
- func (r *Stream[T]) IsClosed() bool {
- return atomic.LoadInt32(&r.closed) == 1
- }
- func (r *Stream[T]) Size() int {
- r.l.Lock()
- defer r.l.Unlock()
- return r.q.Len()
- }
- // WriteError writes an error to the stream
- func (r *Stream[T]) WriteError(err error) {
- r.l.Lock()
- defer r.l.Unlock()
- r.err = err
- if r.q.Len() == 0 {
- if r.listening {
- r.sig <- true
- }
- }
- }
|