pool.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package routine
  2. import (
  3. "context"
  4. "runtime/pprof"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/getsentry/sentry-go"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/panjf2000/ants"
  11. )
  12. var (
  13. p *ants.Pool
  14. l sync.Mutex
  15. )
  16. func IsInit() bool {
  17. l.Lock()
  18. defer l.Unlock()
  19. return p != nil
  20. }
  21. func InitPool(size int, sentryOption ...sentry.ClientOptions) {
  22. l.Lock()
  23. defer l.Unlock()
  24. if p != nil {
  25. return
  26. }
  27. log.Info("init routine pool, size: %d", size)
  28. p, _ = ants.NewPool(size, ants.WithNonblocking(false))
  29. if len(sentryOption) > 0 {
  30. if err := sentry.Init(sentryOption[0]); err != nil {
  31. log.Error("init sentry failed, error: %v", err)
  32. }
  33. }
  34. }
  35. func Submit(labels map[string]string, f func()) {
  36. if labels == nil {
  37. labels = map[string]string{}
  38. }
  39. p.Submit(func() {
  40. label := []string{
  41. "LaunchedAt", time.Now().Format(time.RFC3339),
  42. }
  43. if len(labels) > 0 {
  44. for k, v := range labels {
  45. label = append(label, k, v)
  46. }
  47. }
  48. pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) {
  49. defer sentry.Recover()
  50. f()
  51. })
  52. })
  53. }
  54. func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) {
  55. if maxRoutine <= 0 {
  56. maxRoutine = 1
  57. }
  58. if maxRoutine > len(tasks) {
  59. maxRoutine = len(tasks)
  60. }
  61. Submit(map[string]string{
  62. "module": "routine",
  63. "function": "WithMaxRoutine",
  64. }, func() {
  65. wg := sync.WaitGroup{}
  66. taskIndex := int32(0)
  67. for i := 0; i < maxRoutine; i++ {
  68. wg.Add(1)
  69. Submit(nil, func() {
  70. defer wg.Done()
  71. currentIndex := atomic.AddInt32(&taskIndex, 1)
  72. for currentIndex <= int32(len(tasks)) {
  73. task := tasks[currentIndex-1]
  74. task()
  75. currentIndex = atomic.AddInt32(&taskIndex, 1)
  76. }
  77. })
  78. }
  79. wg.Wait()
  80. if len(on_finish) > 0 {
  81. on_finish[0]()
  82. }
  83. })
  84. }
  85. type PoolStatus struct {
  86. Free int `json:"free"`
  87. Busy int `json:"busy"`
  88. Total int `json:"total"`
  89. }
  90. func FetchRoutineStatus() *PoolStatus {
  91. return &PoolStatus{
  92. Free: p.Free(),
  93. Busy: p.Running(),
  94. Total: p.Cap(),
  95. }
  96. }