Yeuoly 1 рік тому
батько
коміт
0a23ebd3e1

+ 6 - 1
internal/cluster/cluster.go

@@ -5,6 +5,7 @@ import (
 	"sync/atomic"
 
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
 )
@@ -23,6 +24,8 @@ type Cluster struct {
 	plugins     mapping.Map[string, *pluginLifeTime]
 	plugin_lock sync.RWMutex
 
+	manager *plugin_manager.PluginManager
+
 	// nodes stores all the nodes of the cluster
 	nodes mapping.Map[string, node]
 
@@ -46,12 +49,14 @@ type Cluster struct {
 	notify_cluster_stopped_chan           chan bool
 }
 
-func NewCluster(config *app.Config) *Cluster {
+func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
 	return &Cluster{
 		id:        uuid.New().String(),
 		port:      uint16(config.ServerPort),
 		stop_chan: make(chan bool),
 
+		manager: plugin_manager,
+
 		notify_become_master_chan:             make(chan bool),
 		notify_master_gc_chan:                 make(chan bool),
 		notify_master_gc_completed_chan:       make(chan bool),

+ 1 - 1
internal/cluster/clutser_test.go

@@ -20,7 +20,7 @@ func createSimulationCluster(nums int) ([]*Cluster, error) {
 	for i := 0; i < nums; i++ {
 		result = append(result, NewCluster(&app.Config{
 			ServerPort: 12121,
-		}))
+		}, nil))
 	}
 
 	log.SetShowLog(false)

+ 7 - 0
internal/cluster/plugin.go

@@ -243,5 +243,12 @@ func (c *Cluster) autoGCPlugins() error {
 
 func (c *Cluster) IsPluginNoCurrentNode(identity plugin_entities.PluginUniqueIdentifier) bool {
 	_, ok := c.plugins.Load(identity.String())
+	if !ok {
+		if c.manager.Get(identity) == nil {
+			return false
+		} else {
+			return true
+		}
+	}
 	return ok
 }

+ 1 - 0
internal/core/plugin_daemon/backwards_invocation/task.go

@@ -15,6 +15,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
+// returns error only if payload is not correct
 func InvokeDify(
 	declaration *plugin_entities.PluginDeclaration,
 	invoke_from access_types.PluginAccessType,

+ 56 - 32
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_handler.go

@@ -25,9 +25,19 @@ func NewAWSTransactionHandler(max_timeout time.Duration) *AWSTransactionHandler
 }
 
 type awsTransactionWriteCloser struct {
-	gin.ResponseWriter
 	done   chan bool
 	closed int32
+
+	writer func([]byte) (int, error)
+	flush  func()
+}
+
+func (a *awsTransactionWriteCloser) Write(data []byte) (int, error) {
+	return a.writer(data)
+}
+
+func (a *awsTransactionWriteCloser) Flush() {
+	a.flush()
 }
 
 func (w *awsTransactionWriteCloser) Close() error {
@@ -42,50 +52,64 @@ func (h *AWSTransactionHandler) Handle(
 	session_id string,
 ) {
 	writer := &awsTransactionWriteCloser{
-		ResponseWriter: ctx.Writer,
-		done:           make(chan bool),
+		writer: ctx.Writer.Write,
+		flush:  ctx.Writer.Flush,
+		done:   make(chan bool),
 	}
 
 	body := ctx.Request.Body
 	// read at most 6MB
 	bytes, err := io.ReadAll(io.LimitReader(body, 6*1024*1024))
 	if err != nil {
-		writer.WriteHeader(http.StatusBadRequest)
-		writer.Write([]byte(err.Error()))
+		ctx.Writer.WriteHeader(http.StatusBadRequest)
+		ctx.Writer.Write([]byte(err.Error()))
 		return
 	}
 
-	writer.WriteHeader(http.StatusOK)
-	writer.Header().Set("Content-Type", "text/event-stream")
+	ctx.Writer.WriteHeader(http.StatusOK)
+	ctx.Writer.Header().Set("Content-Type", "text/event-stream")
 
-	// parse the data
-	data, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](bytes)
-	if err != nil {
-		log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
-		writer.WriteHeader(http.StatusBadRequest)
-		writer.Write([]byte(err.Error()))
-		return
-	}
+	plugin_entities.ParsePluginUniversalEvent(
+		bytes,
+		func(session_id string, data []byte) {
+			// parse the data
+			session_message, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](data)
+			if err != nil {
+				ctx.Writer.WriteHeader(http.StatusBadRequest)
+				ctx.Writer.Write([]byte(err.Error()))
+				writer.Close()
+				return
+			}
 
-	session := session_manager.GetSession(session_id)
-	if session == nil {
-		log.Error("session not found: %s", session_id)
-		writer.WriteHeader(http.StatusInternalServerError)
-		writer.Write([]byte("session not found"))
-		return
-	}
+			session := session_manager.GetSession(session_id)
+			if session == nil {
+				log.Error("session not found: %s", session_id)
+				ctx.Writer.WriteHeader(http.StatusInternalServerError)
+				ctx.Writer.Write([]byte("session not found"))
+				writer.Close()
+				return
+			}
 
-	aws_response_writer := NewAWSTransactionWriter(session, writer)
+			aws_response_writer := NewAWSTransactionWriter(session, writer)
 
-	if err := backwards_invocation.InvokeDify(
-		session.Declaration,
-		session.InvokeFrom,
-		session,
-		aws_response_writer,
-		data.Data,
-	); err != nil {
-		log.Error("invoke dify failed: %s", err.Error())
-	}
+			if err := backwards_invocation.InvokeDify(
+				session.Declaration,
+				session.InvokeFrom,
+				session,
+				aws_response_writer,
+				session_message.Data,
+			); err != nil {
+				ctx.Writer.WriteHeader(http.StatusInternalServerError)
+				ctx.Writer.Write([]byte("failed to parse request"))
+				writer.Close()
+			}
+		},
+		func() {},
+		func(err string) {
+			log.Warn("invoke dify failed, received errors: %s", err)
+		},
+		func(message string) {}, //log
+	)
 
 	select {
 	case <-writer.done:

+ 17 - 8
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_writer.go

@@ -7,11 +7,17 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 )
 
+type WriteFlushCloser interface {
+	io.WriteCloser
+
+	Flush()
+}
+
 // AWSTransactionWriter is a writer that implements the backwards_invocation.BackwardsInvocationWriter interface
 // it is used to write data to the plugin runtime
 type AWSTransactionWriter struct {
-	session     *session_manager.Session
-	writeCloser io.WriteCloser
+	session          *session_manager.Session
+	writeFlushCloser WriteFlushCloser
 
 	backwards_invocation.BackwardsInvocationWriter
 }
@@ -19,21 +25,24 @@ type AWSTransactionWriter struct {
 // NewAWSTransactionWriter creates a new transaction writer
 func NewAWSTransactionWriter(
 	session *session_manager.Session,
-	writeCloser io.WriteCloser,
+	writeFlushCloser WriteFlushCloser,
 ) *AWSTransactionWriter {
 	return &AWSTransactionWriter{
-		session:     session,
-		writeCloser: writeCloser,
+		session:          session,
+		writeFlushCloser: writeFlushCloser,
 	}
 }
 
 // Write writes the event and data to the session
-// WARNING: write
 func (w *AWSTransactionWriter) Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error {
-	_, err := w.writeCloser.Write(w.session.Message(event, data))
+	_, err := w.writeFlushCloser.Write(append(w.session.Message(event, data), '\n', '\n'))
+	if err != nil {
+		return err
+	}
+	w.writeFlushCloser.Flush()
 	return err
 }
 
 func (w *AWSTransactionWriter) Done() {
-	w.writeCloser.Close()
+	w.writeFlushCloser.Close()
 }

+ 6 - 2
internal/core/plugin_daemon/endpoint_service.go

@@ -38,7 +38,7 @@ func InvokeEndpoint(
 	for resp.Next() {
 		result, err := resp.Read()
 		if err != nil {
-			resp.Close()
+			response.Close()
 			return http.StatusInternalServerError, nil, nil, err
 		}
 
@@ -55,7 +55,7 @@ func InvokeEndpoint(
 		if result.Result != nil {
 			dehexed, err := hex.DecodeString(*result.Result)
 			if err != nil {
-				resp.Close()
+				response.Close()
 				return http.StatusInternalServerError, nil, nil, err
 			}
 
@@ -79,5 +79,9 @@ func InvokeEndpoint(
 		}
 	}
 
+	if resp.IsClosed() {
+		response.Close()
+	}
+
 	return status_code, headers, response, nil
 }

+ 1 - 2
internal/core/plugin_daemon/generic.go

@@ -5,7 +5,6 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -18,7 +17,7 @@ func genericInvokePlugin[Req any, Rsp any](
 	request *Req,
 	response_buffer_size int,
 ) (*stream.Stream[Rsp], error) {
-	runtime := plugin_manager.Manager().Get(session.PluginUniqueIdentifier)
+	runtime := session.Runtime()
 	if runtime == nil {
 		return nil, errors.New("plugin not found")
 	}

+ 58 - 11
internal/core/plugin_manager/aws_manager/io.go

@@ -33,13 +33,22 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
 
 	url, err := url.JoinPath(r.LambdaURL, "invoke")
 	if err != nil {
+		l.Send(plugin_entities.SessionMessage{
+			Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
+			Data: parser.MarshalJsonBytes(plugin_entities.ErrorResponse{
+				Error: fmt.Sprintf("Error creating request: %v", err),
+			}),
+		})
+		l.Close()
 		r.Error(fmt.Sprintf("Error creating request: %v", err))
 		return
 	}
 
+	connect_time := 240 * time.Second
+
 	// create a new http request
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
-	defer cancel()
+	ctx, cancel := context.WithTimeout(context.Background(), connect_time)
+	time.AfterFunc(connect_time, cancel)
 	req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data))
 	if err != nil {
 		r.Error(fmt.Sprintf("Error creating request: %v", err))
@@ -52,30 +61,68 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
 	routine.Submit(func() {
 		// remove the session from listeners
 		defer r.listeners.Delete(session_id)
+		defer l.Close()
+		defer l.Send(plugin_entities.SessionMessage{
+			Type: plugin_entities.SESSION_MESSAGE_TYPE_END,
+			Data: []byte(""),
+		})
 
 		response, err := r.client.Do(req)
 		if err != nil {
+			l.Send(plugin_entities.SessionMessage{
+				Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
+				Data: parser.MarshalJsonBytes(plugin_entities.ErrorResponse{
+					Error: "failed to establish connection to plugin",
+				}),
+			})
 			r.Error(fmt.Sprintf("Error sending request to aws lambda: %v", err))
 			return
 		}
 
 		// write to data stream
 		scanner := bufio.NewScanner(response.Body)
-		for scanner.Scan() {
+		session_alive := true
+		for scanner.Scan() && session_alive {
 			bytes := scanner.Bytes()
 			if len(bytes) == 0 {
 				continue
 			}
 
-			data, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](bytes)
-			if err != nil {
-				log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
-				continue
-			}
-
-			l.Send(data)
+			plugin_entities.ParsePluginUniversalEvent(
+				bytes,
+				func(session_id string, data []byte) {
+					session_message, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](data)
+					if err != nil {
+						l.Send(plugin_entities.SessionMessage{
+							Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
+							Data: parser.MarshalJsonBytes(plugin_entities.ErrorResponse{
+								Error: fmt.Sprintf("failed to parse session message %s, err: %v", bytes, err),
+							}),
+						})
+						session_alive = false
+					}
+					l.Send(session_message)
+				},
+				func() {},
+				func(err string) {
+					l.Send(plugin_entities.SessionMessage{
+						Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
+						Data: parser.MarshalJsonBytes(plugin_entities.ErrorResponse{
+							Error: fmt.Sprintf("encountered an error: %v", err),
+						}),
+					})
+				},
+				func(message string) {},
+			)
 		}
 
-		l.Close()
+		if scanner.Err() != nil {
+			l.Send(plugin_entities.SessionMessage{
+				Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
+				Data: parser.MarshalJsonBytes(plugin_entities.ErrorResponse{
+					Error: fmt.Sprintf("failed to read response body: %v", scanner.Err()),
+				}),
+			})
+		}
 	})
 }

+ 12 - 6
internal/core/plugin_manager/lifetime.go

@@ -7,6 +7,10 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
+func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.PluginLifetime) error) {
+	p.pluginRegisters = append(p.pluginRegisters, handler)
+}
+
 func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLifetime) {
 	configuration := r.Configuration()
 
@@ -19,15 +23,17 @@ func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLif
 	// stop plugin when the plugin reaches the end of its lifetime
 	defer r.Stop()
 
-	// add plugin to cluster
-	err := p.cluster.RegisterPlugin(r)
-	if err != nil {
-		log.Error("add plugin to cluster failed: %s", err.Error())
-		return
+	// register plugin
+	for _, reg := range p.pluginRegisters {
+		err := reg(r)
+		if err != nil {
+			log.Error("add plugin to cluster failed: %s", err.Error())
+			return
+		}
 	}
 
 	// add plugin to manager
-	err = p.Add(r)
+	err := p.Add(r)
 	if err != nil {
 		log.Error("add plugin to manager failed: %s", err.Error())
 		return

+ 20 - 33
internal/core/plugin_manager/local_manager/stdio_handle.go

@@ -11,7 +11,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 var (
@@ -77,40 +76,28 @@ func (s *stdioHolder) StartStdout() {
 			continue
 		}
 
-		event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
-		if err != nil {
-			// log.Error("unmarshal json failed: %s", err.Error())
-			continue
-		}
-
-		session_id := event.SessionId
-
-		switch event.Event {
-		case plugin_entities.PLUGIN_EVENT_LOG:
-			if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
-				logEvent, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](event.Data)
-				if err != nil {
-					log.Error("unmarshal json failed: %s", err.Error())
-					continue
+		plugin_entities.ParsePluginUniversalEvent(
+			data,
+			func(session_id string, data []byte) {
+				for _, listener := range listeners {
+					listener(s.id, data)
 				}
-
-				log.Info("plugin %s: %s", s.plugin_unique_identifier, logEvent.Message)
-			}
-		case plugin_entities.PLUGIN_EVENT_SESSION:
-			for _, listener := range listeners {
-				listener(s.id, event.Data)
-			}
-
-			for listener_session_id, listener := range s.listener {
-				if listener_session_id == session_id {
-					listener(event.Data)
+				for listener_session_id, listener := range s.listener {
+					if listener_session_id == session_id {
+						listener(data)
+					}
 				}
-			}
-		case plugin_entities.PLUGIN_EVENT_ERROR:
-			log.Error("plugin %s: %s", s.plugin_unique_identifier, event.Data)
-		case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
-			s.last_active_at = time.Now()
-		}
+			},
+			func() {
+				s.last_active_at = time.Now()
+			},
+			func(err string) {
+				log.Error("plugin %s: %s", s.plugin_unique_identifier, err)
+			},
+			func(message string) {
+				log.Info("plugin %s: %s", s.plugin_unique_identifier, message)
+			},
+		)
 	}
 }
 

+ 5 - 6
internal/core/plugin_manager/manager.go

@@ -3,7 +3,6 @@ package plugin_manager
 import (
 	"fmt"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
@@ -20,14 +19,15 @@ import (
 type PluginManager struct {
 	m mapping.Map[string, plugin_entities.PluginLifetime]
 
-	cluster *cluster.Cluster
-
 	maxPluginPackageSize int64
 	workingDirectory     string
 
 	// mediaManager is used to manage media files like plugin icons, images, etc.
 	mediaManager *media_manager.MediaManager
 
+	// register plugin
+	pluginRegisters []func(lifetime plugin_entities.PluginLifetime) error
+
 	// running plugin in storage contains relations between plugin packages and their running instances
 	runningPluginInStorage mapping.Map[string, string]
 	// start process lock
@@ -42,9 +42,8 @@ var (
 	manager *PluginManager
 )
 
-func InitManager(cluster *cluster.Cluster, configuration *app.Config) {
+func NewManager(configuration *app.Config) *PluginManager {
 	manager = &PluginManager{
-		cluster:              cluster,
 		maxPluginPackageSize: configuration.MaxPluginPackageSize,
 		workingDirectory:     configuration.PluginWorkingPath,
 		mediaManager: media_manager.NewMediaManager(
@@ -61,7 +60,7 @@ func InitManager(cluster *cluster.Cluster, configuration *app.Config) {
 		manager.Install = manager.InstallToLocal
 	}
 
-	manager.Init(configuration)
+	return manager
 }
 
 func Manager() *PluginManager {

+ 21 - 35
internal/core/plugin_manager/remote_manager/run.go

@@ -6,7 +6,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
@@ -55,41 +54,28 @@ func (r *RemotePluginRuntime) StartPlugin() error {
 	})
 
 	r.response.Async(func(data []byte) {
-		// handle event
-		event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
-		if err != nil {
-			return
-		}
-
-		session_id := event.SessionId
-
-		switch event.Event {
-		case plugin_entities.PLUGIN_EVENT_LOG:
-			if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
-				log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
-					event.Data,
-				)
-				if err != nil {
-					log.Error("unmarshal json failed: %s", err.Error())
-					return
+		plugin_entities.ParsePluginUniversalEvent(
+			data,
+			func(session_id string, data []byte) {
+				r.callbacks_lock.RLock()
+				listeners := r.callbacks[session_id][:]
+				r.callbacks_lock.RUnlock()
+
+				// handle session event
+				for _, listener := range listeners {
+					listener(data)
 				}
-
-				log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
-			}
-		case plugin_entities.PLUGIN_EVENT_SESSION:
-			r.callbacks_lock.RLock()
-			listeners := r.callbacks[session_id][:]
-			r.callbacks_lock.RUnlock()
-
-			// handle session event
-			for _, listener := range listeners {
-				listener(event.Data)
-			}
-		case plugin_entities.PLUGIN_EVENT_ERROR:
-			log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
-		case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
-			r.last_active_at = time.Now()
-		}
+			},
+			func() {
+				r.last_active_at = time.Now()
+			},
+			func(err string) {
+				log.Error("plugin %s: %s", r.Configuration().Identity(), err)
+			},
+			func(message string) {
+				log.Info("plugin %s: %s", r.Configuration().Identity(), message)
+			},
+		)
 	})
 
 	return exit_error

+ 1 - 1
internal/core/plugin_packager/decoder/decoder.go

@@ -171,5 +171,5 @@ func (p *PluginDecoderHelper) UniqueIdentity(decoder PluginDecoder) (plugin_enti
 		return plugin_entities.PluginUniqueIdentifier(""), err
 	}
 
-	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s-%s", identity, checksum)), nil
+	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", identity, checksum)), nil
 }

+ 1 - 1
internal/server/app.go

@@ -12,7 +12,7 @@ type App struct {
 
 	// endpoint handler
 	// customize behavior of endpoint
-	endpoint_handler EndpointHandler
+	endpointHandler EndpointHandler
 
 	// aws transaction handler
 	// accept aws transaction request and forward to the plugin daemon

+ 6 - 5
internal/server/controllers/endpoint.go

@@ -7,17 +7,18 @@ import (
 )
 
 func SetupEndpoint(ctx *gin.Context) {
-	BindRequestWithPluginUniqueIdentifier(ctx, func(
+	BindRequest(ctx, func(
 		request struct {
-			TenantID string         `json:"tenant_id" binding:"required"`
-			UserID   string         `json:"user_id" binding:"required"`
-			Settings map[string]any `json:"settings" binding:"omitempty"`
+			PluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifier" binding:"required"`
+			TenantID               string                                 `json:"tenant_id" binding:"required"`
+			UserID                 string                                 `json:"user_id" binding:"required"`
+			Settings               map[string]any                         `json:"settings" binding:"omitempty"`
 		},
-		plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
 	) {
 		tenant_id := request.TenantID
 		user_id := request.UserID
 		settings := request.Settings
+		plugin_unique_identifier := request.PluginUniqueIdentifier
 
 		ctx.JSON(200, service.SetupEndpoint(
 			tenant_id, user_id, plugin_unique_identifier, settings,

+ 2 - 2
internal/server/endpoint.go

@@ -21,8 +21,8 @@ func (app *App) Endpoint() func(c *gin.Context) {
 		hook_id := c.Param("hook_id")
 		path := c.Param("path")
 
-		if app.endpoint_handler != nil {
-			app.endpoint_handler(c, hook_id, path)
+		if app.endpointHandler != nil {
+			app.endpointHandler(c, hook_id, path)
 		} else {
 			app.EndpointHandler(c, hook_id, path)
 		}

+ 1 - 1
internal/server/endpoint_test.go

@@ -26,7 +26,7 @@ func TestEndpointParams(t *testing.T) {
 	}
 
 	app_pointer := &App{
-		endpoint_handler: handler,
+		endpointHandler: handler,
 	}
 	cancel := app_pointer.server(&app.Config{
 		ServerPort:            port,

+ 11 - 4
internal/server/server.go

@@ -11,8 +11,6 @@ import (
 )
 
 func (app *App) Run(config *app.Config) {
-	app.cluster = cluster.NewCluster(config)
-
 	// init routine pool
 	routine.InitPool(config.RoutinePoolSize)
 
@@ -22,8 +20,17 @@ func (app *App) Run(config *app.Config) {
 	// init process lifetime
 	process.Init(config)
 
-	// init plugin daemon
-	plugin_manager.InitManager(app.cluster, config)
+	// create manager
+	manager := plugin_manager.NewManager(config)
+
+	// create cluster
+	app.cluster = cluster.NewCluster(config, manager)
+
+	// register plugin lifetime event
+	manager.AddPluginRegisterHandler(app.cluster.RegisterPlugin)
+
+	// init manager
+	manager.Init(config)
 
 	// init persistence
 	persistence.InitPersistence(config)

+ 3 - 1
internal/types/entities/listener.go

@@ -27,7 +27,9 @@ func (r *Broadcast[T]) OnClose(f func()) {
 }
 
 func (r *Broadcast[T]) Close() {
-	r.onClose()
+	if r.onClose != nil {
+		r.onClose()
+	}
 }
 
 func (r *Broadcast[T]) Send(data T) {

+ 46 - 2
internal/types/entities/plugin_entities/event.go

@@ -2,6 +2,9 @@ package plugin_entities
 
 import (
 	"encoding/json"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 type PluginUniversalEvent struct {
@@ -10,6 +13,47 @@ type PluginUniversalEvent struct {
 	Data      json.RawMessage `json:"data"`
 }
 
+// ParsePluginUniversalEvent parses bytes into struct contains basic info of a message
+// it's the outermost layer of the protocol
+// error_handler will be called when data is not standard or itself it's an error message
+func ParsePluginUniversalEvent(
+	data []byte,
+	session_handler func(session_id string, data []byte),
+	heartbeat_handler func(),
+	error_handler func(err string),
+	info_handler func(message string),
+) {
+	// handle event
+	event, err := parser.UnmarshalJsonBytes[PluginUniversalEvent](data)
+	if err != nil {
+		error_handler(err.Error())
+		return
+	}
+
+	session_id := event.SessionId
+
+	switch event.Event {
+	case PLUGIN_EVENT_LOG:
+		if event.Event == PLUGIN_EVENT_LOG {
+			log_event, err := parser.UnmarshalJsonBytes[PluginLogEvent](
+				event.Data,
+			)
+			if err != nil {
+				log.Error("unmarshal json failed: %s", err.Error())
+				return
+			}
+
+			info_handler(log_event.Message)
+		}
+	case PLUGIN_EVENT_SESSION:
+		session_handler(session_id, event.Data)
+	case PLUGIN_EVENT_ERROR:
+		error_handler(string(event.Data))
+	case PLUGIN_EVENT_HEARTBEAT:
+		heartbeat_handler()
+	}
+}
+
 type PluginEventType string
 
 const (
@@ -26,8 +70,8 @@ type PluginLogEvent struct {
 }
 
 type SessionMessage struct {
-	Type SESSION_MESSAGE_TYPE `json:"type"`
-	Data json.RawMessage      `json:"data"`
+	Type SESSION_MESSAGE_TYPE `json:"type" validate:"required"`
+	Data json.RawMessage      `json:"data" validate:"required"`
 }
 
 type SESSION_MESSAGE_TYPE string

+ 4 - 0
internal/utils/stream/response.go

@@ -172,6 +172,10 @@ func (r *Stream[T]) Size() int {
 
 // WriteError writes an error to the stream
 func (r *Stream[T]) WriteError(err error) {
+	if atomic.LoadInt32(&r.closed) == 1 {
+		return
+	}
+
 	r.l.Lock()
 	defer r.l.Unlock()