pool.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package routine
  2. import (
  3. "context"
  4. "runtime/pprof"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/panjf2000/ants"
  9. )
  10. var (
  11. p *ants.Pool
  12. l sync.Mutex
  13. )
  14. func IsInit() bool {
  15. l.Lock()
  16. defer l.Unlock()
  17. return p != nil
  18. }
  19. func InitPool(size int) {
  20. l.Lock()
  21. defer l.Unlock()
  22. if p != nil {
  23. return
  24. }
  25. log.Info("init routine pool, size: %d", size)
  26. p, _ = ants.NewPool(size, ants.WithNonblocking(false))
  27. }
  28. func Submit(labels map[string]string, f func()) {
  29. if labels == nil {
  30. labels = map[string]string{}
  31. }
  32. p.Submit(func() {
  33. label := []string{}
  34. if len(labels) > 0 {
  35. for k, v := range labels {
  36. label = append(label, k, v)
  37. }
  38. }
  39. pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) {
  40. f()
  41. })
  42. })
  43. }
  44. func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) {
  45. if maxRoutine <= 0 {
  46. maxRoutine = 1
  47. }
  48. if maxRoutine > len(tasks) {
  49. maxRoutine = len(tasks)
  50. }
  51. Submit(map[string]string{
  52. "module": "routine",
  53. "function": "WithMaxRoutine",
  54. }, func() {
  55. wg := sync.WaitGroup{}
  56. taskIndex := int32(0)
  57. for i := 0; i < maxRoutine; i++ {
  58. wg.Add(1)
  59. Submit(nil, func() {
  60. defer wg.Done()
  61. currentIndex := atomic.AddInt32(&taskIndex, 1)
  62. for currentIndex <= int32(len(tasks)) {
  63. task := tasks[currentIndex-1]
  64. task()
  65. currentIndex = atomic.AddInt32(&taskIndex, 1)
  66. }
  67. })
  68. }
  69. wg.Wait()
  70. if len(on_finish) > 0 {
  71. on_finish[0]()
  72. }
  73. })
  74. }
  75. type PoolStatus struct {
  76. Free int `json:"free"`
  77. Busy int `json:"busy"`
  78. Total int `json:"total"`
  79. }
  80. func FetchRoutineStatus() *PoolStatus {
  81. return &PoolStatus{
  82. Free: p.Free(),
  83. Busy: p.Running(),
  84. Total: p.Cap(),
  85. }
  86. }