Browse Source

fix: redis subcribe

Yeuoly 10 months ago
parent
commit
6926e58717
1 changed files with 24 additions and 5 deletions
  1. 24 5
      internal/utils/cache/redis.go

+ 24 - 5
internal/utils/cache/redis.go

@@ -453,20 +453,39 @@ func Publish(channel string, message any, context ...redis.Cmdable) error {
 func Subscribe[T any](channel string) (<-chan T, func()) {
 	pubsub := client.Subscribe(ctx, channel)
 	ch := make(chan T)
+	connection_established := make(chan bool)
 
 	go func() {
 		defer close(ch)
+		defer close(connection_established)
 
-		for msg := range pubsub.Channel() {
-			v, err := parser.UnmarshalJson[T](msg.Payload)
+		alive := true
+		for alive {
+			iface, err := pubsub.Receive(context.Background())
 			if err != nil {
-				continue
+				alive = false
+				break
+			}
+			switch data := iface.(type) {
+			case *redis.Subscription:
+				connection_established <- true
+			case *redis.Message:
+				v, err := parser.UnmarshalJson[T](data.Payload)
+				if err != nil {
+					continue
+				}
+
+				ch <- v
+			case *redis.Pong:
+			default:
+				alive = false
 			}
-
-			ch <- v
 		}
 	}()
 
+	// wait for the connection to be established
+	<-connection_established
+
 	return ch, func() {
 		pubsub.Close()
 	}