pool.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package routine
  2. import (
  3. "context"
  4. "runtime/pprof"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/getsentry/sentry-go"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/panjf2000/ants"
  10. )
  11. var (
  12. p *ants.Pool
  13. l sync.Mutex
  14. )
  15. func IsInit() bool {
  16. l.Lock()
  17. defer l.Unlock()
  18. return p != nil
  19. }
  20. func InitPool(size int, sentryOption ...sentry.ClientOptions) {
  21. l.Lock()
  22. defer l.Unlock()
  23. if p != nil {
  24. return
  25. }
  26. log.Info("init routine pool, size: %d", size)
  27. p, _ = ants.NewPool(size, ants.WithNonblocking(false))
  28. if len(sentryOption) > 0 {
  29. if err := sentry.Init(sentryOption[0]); err != nil {
  30. log.Error("init sentry failed, error: %v", err)
  31. }
  32. }
  33. }
  34. func Submit(labels map[string]string, f func()) {
  35. if labels == nil {
  36. labels = map[string]string{}
  37. }
  38. p.Submit(func() {
  39. label := []string{}
  40. if len(labels) > 0 {
  41. for k, v := range labels {
  42. label = append(label, k, v)
  43. }
  44. }
  45. pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) {
  46. defer sentry.Recover()
  47. f()
  48. })
  49. })
  50. }
  51. func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) {
  52. if maxRoutine <= 0 {
  53. maxRoutine = 1
  54. }
  55. if maxRoutine > len(tasks) {
  56. maxRoutine = len(tasks)
  57. }
  58. Submit(map[string]string{
  59. "module": "routine",
  60. "function": "WithMaxRoutine",
  61. }, func() {
  62. wg := sync.WaitGroup{}
  63. taskIndex := int32(0)
  64. for i := 0; i < maxRoutine; i++ {
  65. wg.Add(1)
  66. Submit(nil, func() {
  67. defer wg.Done()
  68. currentIndex := atomic.AddInt32(&taskIndex, 1)
  69. for currentIndex <= int32(len(tasks)) {
  70. task := tasks[currentIndex-1]
  71. task()
  72. currentIndex = atomic.AddInt32(&taskIndex, 1)
  73. }
  74. })
  75. }
  76. wg.Wait()
  77. if len(on_finish) > 0 {
  78. on_finish[0]()
  79. }
  80. })
  81. }
  82. type PoolStatus struct {
  83. Free int `json:"free"`
  84. Busy int `json:"busy"`
  85. Total int `json:"total"`
  86. }
  87. func FetchRoutineStatus() *PoolStatus {
  88. return &PoolStatus{
  89. Free: p.Free(),
  90. Busy: p.Running(),
  91. Total: p.Cap(),
  92. }
  93. }