response_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package stream
  2. import (
  3. "errors"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. func TestStreamGenerator(t *testing.T) {
  9. response := NewStream[int](512)
  10. wg := sync.WaitGroup{}
  11. wg.Add(2)
  12. go func() {
  13. for i := 0; i < 10000; i++ {
  14. response.Write(i)
  15. time.Sleep(time.Microsecond)
  16. }
  17. wg.Done()
  18. }()
  19. go func() {
  20. for i := 0; i < 10000; i++ {
  21. response.Write(i)
  22. time.Sleep(time.Microsecond)
  23. }
  24. wg.Done()
  25. }()
  26. go func() {
  27. wg.Wait()
  28. response.Close()
  29. }()
  30. msg := 0
  31. for response.Next() {
  32. _, err := response.Read()
  33. if err != nil {
  34. t.Error(err)
  35. }
  36. msg += 1
  37. }
  38. if msg != 20000 {
  39. t.Errorf("Expected 10000 messages, got %d", msg)
  40. }
  41. }
  42. func TestStreamGeneratorErrorMessage(t *testing.T) {
  43. response := NewStream[int](512)
  44. go func() {
  45. for i := 0; i < 10000; i++ {
  46. response.Write(i)
  47. time.Sleep(time.Microsecond)
  48. }
  49. response.WriteError(errors.New("test error"))
  50. response.Close()
  51. }()
  52. for response.Next() {
  53. _, err := response.Read()
  54. if err != nil {
  55. if err.Error() != "test error" {
  56. t.Error(err)
  57. }
  58. }
  59. }
  60. }
  61. func TestStreamGeneratorWrapper(t *testing.T) {
  62. response := NewStream[int](512)
  63. nums := 0
  64. go func() {
  65. for i := 0; i < 10000; i++ {
  66. response.Write(i)
  67. time.Sleep(time.Microsecond)
  68. }
  69. response.Close()
  70. }()
  71. response.Async(func(t int) {
  72. nums += 1
  73. })
  74. if nums != 10000 {
  75. t.Errorf("Expected 10000 messages, got %d", nums)
  76. }
  77. }