pool.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package routine
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/panjf2000/ants"
  7. )
  8. var (
  9. p *ants.Pool
  10. l sync.Mutex
  11. )
  12. func IsInit() bool {
  13. l.Lock()
  14. defer l.Unlock()
  15. return p != nil
  16. }
  17. func InitPool(size int) {
  18. l.Lock()
  19. defer l.Unlock()
  20. if p != nil {
  21. return
  22. }
  23. log.Info("init routine pool, size: %d", size)
  24. p, _ = ants.NewPool(size, ants.WithNonblocking(false))
  25. }
  26. func Submit(f func()) {
  27. p.Submit(f)
  28. }
  29. func WithMaxRoutine(max_routine int, tasks []func(), on_finish ...func()) {
  30. if max_routine <= 0 {
  31. max_routine = 1
  32. }
  33. if max_routine > len(tasks) {
  34. max_routine = len(tasks)
  35. }
  36. Submit(func() {
  37. wg := sync.WaitGroup{}
  38. task_index := int32(0)
  39. for i := 0; i < max_routine; i++ {
  40. wg.Add(1)
  41. Submit(func() {
  42. defer wg.Done()
  43. current_index := atomic.AddInt32(&task_index, 1)
  44. for current_index <= int32(len(tasks)) {
  45. task := tasks[current_index-1]
  46. task()
  47. current_index = atomic.AddInt32(&task_index, 1)
  48. }
  49. })
  50. }
  51. wg.Wait()
  52. if len(on_finish) > 0 {
  53. on_finish[0]()
  54. }
  55. })
  56. }
  57. type PoolStatus struct {
  58. Free int `json:"free"`
  59. Busy int `json:"busy"`
  60. Total int `json:"total"`
  61. }
  62. func FetchRoutineStatus() *PoolStatus {
  63. return &PoolStatus{
  64. Free: p.Free(),
  65. Busy: p.Running(),
  66. Total: p.Cap(),
  67. }
  68. }