| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 | package routineimport (	"context"	"runtime/pprof"	"sync"	"sync/atomic"	"time"	"github.com/getsentry/sentry-go"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"	"github.com/panjf2000/ants")var (	p *ants.Pool	l sync.Mutex)func IsInit() bool {	l.Lock()	defer l.Unlock()	return p != nil}func InitPool(size int, sentryOption ...sentry.ClientOptions) {	l.Lock()	defer l.Unlock()	if p != nil {		return	}	log.Info("init routine pool, size: %d", size)	p, _ = ants.NewPool(size, ants.WithNonblocking(false))	if len(sentryOption) > 0 {		if err := sentry.Init(sentryOption[0]); err != nil {			log.Error("init sentry failed, error: %v", err)		}	}}func Submit(labels map[string]string, f func()) {	if labels == nil {		labels = map[string]string{}	}	p.Submit(func() {		label := []string{			"LaunchedAt", time.Now().Format(time.RFC3339),		}		if len(labels) > 0 {			for k, v := range labels {				label = append(label, k, v)			}		}		pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) {			defer sentry.Recover()			f()		})	})}func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) {	if maxRoutine <= 0 {		maxRoutine = 1	}	if maxRoutine > len(tasks) {		maxRoutine = len(tasks)	}	Submit(map[string]string{		"module":   "routine",		"function": "WithMaxRoutine",	}, func() {		wg := sync.WaitGroup{}		taskIndex := int32(0)		for i := 0; i < maxRoutine; i++ {			wg.Add(1)			Submit(nil, func() {				defer wg.Done()				currentIndex := atomic.AddInt32(&taskIndex, 1)				for currentIndex <= int32(len(tasks)) {					task := tasks[currentIndex-1]					task()					currentIndex = atomic.AddInt32(&taskIndex, 1)				}			})		}		wg.Wait()		if len(on_finish) > 0 {			on_finish[0]()		}	})}type PoolStatus struct {	Free  int `json:"free"`	Busy  int `json:"busy"`	Total int `json:"total"`}func FetchRoutineStatus() *PoolStatus {	return &PoolStatus{		Free:  p.Free(),		Busy:  p.Running(),		Total: p.Cap(),	}}
 |