| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 | package serviceimport (	"errors"	"sync/atomic"	"time"	"github.com/gin-gonic/gin"	"github.com/langgenius/dify-plugin-daemon/internal/types/exception"	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"	"github.com/langgenius/dify-plugin-daemon/pkg/entities")// baseSSEService is a helper function to handle SSE service// it accepts a generator function that returns a stream response to gin contextfunc baseSSEService[R any](	generator func() (*stream.Stream[R], error),	ctx *gin.Context,	max_timeout_seconds int,) {	writer := ctx.Writer	writer.WriteHeader(200)	writer.Header().Set("Content-Type", "text/event-stream")	done := make(chan bool)	doneClosed := new(int32)	closed := new(int32)	writeData := func(data interface{}) {		if atomic.LoadInt32(closed) == 1 {			return		}		writer.Write([]byte("data: "))		writer.Write(parser.MarshalJsonBytes(data))		writer.Write([]byte("\n\n"))		writer.Flush()	}	pluginDaemonResponse, err := generator()	if err != nil {		writeData(exception.InternalServerError(err).ToResponse())		close(done)		return	}	routine.Submit(map[string]string{		"module":   "service",		"function": "baseSSEService",	}, func() {		for pluginDaemonResponse.Next() {			chunk, err := pluginDaemonResponse.Read()			if err != nil {				writeData(exception.InvokePluginError(err).ToResponse())				break			}			writeData(entities.NewSuccessResponse(chunk))		}		if atomic.CompareAndSwapInt32(doneClosed, 0, 1) {			close(done)		}	})	timer := time.NewTimer(time.Duration(max_timeout_seconds) * time.Second)	defer timer.Stop()	defer func() {		atomic.StoreInt32(closed, 1)	}()	select {	case <-writer.CloseNotify():		pluginDaemonResponse.Close()		return	case <-done:		return	case <-timer.C:		writeData(exception.InternalServerError(errors.New("killed by timeout")).ToResponse())		if atomic.CompareAndSwapInt32(doneClosed, 0, 1) {			close(done)		}		return	}}
 |