pool.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package routine
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/panjf2000/ants"
  7. )
  8. var (
  9. p *ants.Pool
  10. l sync.Mutex
  11. )
  12. func IsInit() bool {
  13. l.Lock()
  14. defer l.Unlock()
  15. return p != nil
  16. }
  17. func InitPool(size int) {
  18. l.Lock()
  19. defer l.Unlock()
  20. if p != nil {
  21. return
  22. }
  23. log.Info("init routine pool, size: %d", size)
  24. p, _ = ants.NewPool(size, ants.WithNonblocking(false))
  25. }
  26. func Submit(f func()) {
  27. p.Submit(f)
  28. }
  29. func WithMaxRoutine(max_routine int, tasks []func(), on_finish ...func()) {
  30. if max_routine <= 0 {
  31. max_routine = 1
  32. }
  33. if max_routine > len(tasks) {
  34. max_routine = len(tasks)
  35. }
  36. Submit(func() {
  37. wg := sync.WaitGroup{}
  38. task_index := int32(0)
  39. for i := 0; i < max_routine; i++ {
  40. wg.Add(1)
  41. Submit(func() {
  42. defer wg.Done()
  43. current_index := atomic.AddInt32(&task_index, 1)
  44. for current_index <= int32(len(tasks)) {
  45. task := tasks[current_index-1]
  46. task()
  47. current_index = atomic.AddInt32(&task_index, 1)
  48. }
  49. })
  50. }
  51. wg.Wait()
  52. if len(on_finish) > 0 {
  53. on_finish[0]()
  54. }
  55. })
  56. }