Browse Source

refactor: io listener to boardcast

Yeuoly 11 months ago
parent
commit
0ba5c28ba9

+ 1 - 1
internal/core/plugin_daemon/generic.go

@@ -28,7 +28,7 @@ func genericInvokePlugin[Req any, Rsp any](
 	response := stream.NewStreamResponse[Rsp](response_buffer_size)
 
 	listener := runtime.Listen(session.ID())
-	listener.AddListener(func(message []byte) {
+	listener.Listen(func(message []byte) {
 		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
 		if err != nil {
 			log.Error("unmarshal json failed: %s", err.Error())

+ 1 - 1
internal/core/plugin_manager/aws_manager/packager_test.go

@@ -47,7 +47,7 @@ func (r *TPluginRuntime) Wait() (<-chan bool, error) {
 	return nil, nil
 }
 
-func (r *TPluginRuntime) Listen(string) *entities.IOListener[[]byte] {
+func (r *TPluginRuntime) Listen(string) *entities.Broadcast[[]byte] {
 	return nil
 }
 

+ 1 - 1
internal/core/plugin_manager/local_manager/io.go

@@ -10,7 +10,7 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.BytesIOListener
 		RemoveStdioListener(r.io_identity, session_id)
 	})
 	OnStdioEvent(r.io_identity, session_id, func(b []byte) {
-		listener.Emit(b)
+		listener.Send(b)
 	})
 	return listener
 }

+ 1 - 1
internal/core/plugin_manager/remote_manager/io.go

@@ -12,7 +12,7 @@ func (r *RemotePluginRuntime) Listen(session_id string) *entities.BytesIOListene
 	})
 
 	r.addCallback(session_id, func(data []byte) {
-		listener.Emit(data)
+		listener.Send(data)
 	})
 
 	return listener

+ 8 - 8
internal/types/entities/listener.go

@@ -2,35 +2,35 @@ package entities
 
 import "sync"
 
-type IOListener[T any] struct {
+type Broadcast[T any] struct {
 	l        *sync.RWMutex
 	onClose  func()
 	listener []func(T)
 }
 
-type BytesIOListener = IOListener[[]byte]
+type BytesIOListener = Broadcast[[]byte]
 
-func NewIOListener[T any]() *IOListener[T] {
-	return &IOListener[T]{
+func NewIOListener[T any]() *Broadcast[T] {
+	return &Broadcast[T]{
 		l: &sync.RWMutex{},
 	}
 }
 
-func (r *IOListener[T]) AddListener(f func(T)) {
+func (r *Broadcast[T]) Listen(f func(T)) {
 	r.l.Lock()
 	defer r.l.Unlock()
 	r.listener = append(r.listener, f)
 }
 
-func (r *IOListener[T]) OnClose(f func()) {
+func (r *Broadcast[T]) OnClose(f func()) {
 	r.onClose = f
 }
 
-func (r *IOListener[T]) Close() {
+func (r *Broadcast[T]) Close() {
 	r.onClose()
 }
 
-func (r *IOListener[T]) Emit(data T) {
+func (r *Broadcast[T]) Send(data T) {
 	r.l.RLock()
 	defer r.l.RUnlock()
 	for _, listener := range r.listener {