response_test.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package stream
  2. import (
  3. "errors"
  4. "testing"
  5. "time"
  6. )
  7. func TestStreamGenerator(t *testing.T) {
  8. response := NewStreamResponse[int](512)
  9. go func() {
  10. for i := 0; i < 10000; i++ {
  11. response.Write(i)
  12. time.Sleep(time.Microsecond)
  13. }
  14. response.Close()
  15. }()
  16. msg := 0
  17. for response.Next() {
  18. _, err := response.Read()
  19. if err != nil {
  20. t.Error(err)
  21. }
  22. msg += 1
  23. }
  24. if msg != 10000 {
  25. t.Errorf("Expected 10000 messages, got %d", msg)
  26. }
  27. }
  28. func TestStreamGeneratorErrorMessage(t *testing.T) {
  29. response := NewStreamResponse[int](512)
  30. go func() {
  31. for i := 0; i < 10000; i++ {
  32. response.Write(i)
  33. time.Sleep(time.Microsecond)
  34. }
  35. response.WriteError(errors.New("test error"))
  36. response.Close()
  37. }()
  38. for response.Next() {
  39. _, err := response.Read()
  40. if err != nil {
  41. if err.Error() != "test error" {
  42. t.Error(err)
  43. }
  44. }
  45. }
  46. }
  47. func TestStreamGeneratorWrapper(t *testing.T) {
  48. response := NewStreamResponse[int](512)
  49. nums := 0
  50. go func() {
  51. for i := 0; i < 10000; i++ {
  52. response.Write(i)
  53. time.Sleep(time.Microsecond)
  54. }
  55. response.Close()
  56. }()
  57. response.Wrap(func(t int) {
  58. nums += 1
  59. })
  60. if nums != 10000 {
  61. t.Errorf("Expected 10000 messages, got %d", nums)
  62. }
  63. }