123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package routine
- import (
- "context"
- "runtime/pprof"
- "sync"
- "sync/atomic"
- "github.com/getsentry/sentry-go"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/panjf2000/ants"
- )
- var (
- p *ants.Pool
- l sync.Mutex
- )
- func IsInit() bool {
- l.Lock()
- defer l.Unlock()
- return p != nil
- }
- func InitPool(size int, sentryOption ...sentry.ClientOptions) {
- l.Lock()
- defer l.Unlock()
- if p != nil {
- return
- }
- log.Info("init routine pool, size: %d", size)
- p, _ = ants.NewPool(size, ants.WithNonblocking(false))
- if len(sentryOption) > 0 {
- if err := sentry.Init(sentryOption[0]); err != nil {
- log.Error("init sentry failed, error: %v", err)
- }
- }
- }
- func Submit(labels map[string]string, f func()) {
- if labels == nil {
- labels = map[string]string{}
- }
- p.Submit(func() {
- label := []string{}
- if len(labels) > 0 {
- for k, v := range labels {
- label = append(label, k, v)
- }
- }
- pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) {
- defer sentry.Recover()
- f()
- })
- })
- }
- func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) {
- if maxRoutine <= 0 {
- maxRoutine = 1
- }
- if maxRoutine > len(tasks) {
- maxRoutine = len(tasks)
- }
- Submit(map[string]string{
- "module": "routine",
- "function": "WithMaxRoutine",
- }, func() {
- wg := sync.WaitGroup{}
- taskIndex := int32(0)
- for i := 0; i < maxRoutine; i++ {
- wg.Add(1)
- Submit(nil, func() {
- defer wg.Done()
- currentIndex := atomic.AddInt32(&taskIndex, 1)
- for currentIndex <= int32(len(tasks)) {
- task := tasks[currentIndex-1]
- task()
- currentIndex = atomic.AddInt32(&taskIndex, 1)
- }
- })
- }
- wg.Wait()
- if len(on_finish) > 0 {
- on_finish[0]()
- }
- })
- }
- type PoolStatus struct {
- Free int `json:"free"`
- Busy int `json:"busy"`
- Total int `json:"total"`
- }
- func FetchRoutineStatus() *PoolStatus {
- return &PoolStatus{
- Free: p.Free(),
- Busy: p.Running(),
- Total: p.Cap(),
- }
- }
|