123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- package remote_manager
- import (
- "encoding/hex"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
- "github.com/panjf2000/gnet/v2"
- )
- var (
- // mode is only used for testing
- _mode pluginRuntimeMode
- )
- type DifyServer struct {
- gnet.BuiltinEventEngine
- engine gnet.Engine
- mediaManager *media_manager.MediaManager
- // listening address
- addr string
- port uint16
- // enabled multicore
- multicore bool
- // event loop count
- num_loops int
- // read new connections
- response *stream.Stream[*RemotePluginRuntime]
- plugins map[int]*RemotePluginRuntime
- plugins_lock *sync.RWMutex
- shutdown_chan chan bool
- max_conn int32
- current_conn int32
- }
- func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
- s.engine = c
- return gnet.None
- }
- func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
- // new plugin connected
- c.SetContext(&codec{})
- runtime := &RemotePluginRuntime{
- BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(
- s.mediaManager,
- ),
- conn: c,
- response: stream.NewStream[[]byte](512),
- callbacks: make(map[string][]func([]byte)),
- callbacks_lock: &sync.RWMutex{},
- shutdown_chan: make(chan bool),
- wait_launched_chan: make(chan error),
- alive: true,
- }
- // store plugin runtime
- s.plugins_lock.Lock()
- s.plugins[c.Fd()] = runtime
- s.plugins_lock.Unlock()
- // start a timer to check if handshake is completed in 10 seconds
- time.AfterFunc(time.Second*10, func() {
- if !runtime.handshake {
- // close connection
- c.Close()
- }
- })
- // verified
- verified := true
- if verified {
- return nil, gnet.None
- }
- return nil, gnet.Close
- }
- func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
- // plugin disconnected
- s.plugins_lock.Lock()
- plugin := s.plugins[c.Fd()]
- delete(s.plugins, c.Fd())
- s.plugins_lock.Unlock()
- if plugin == nil {
- return gnet.None
- }
- // close plugin
- plugin.onDisconnected()
- // uninstall plugin
- if plugin.assets_transferred {
- if _mode != _PLUGIN_RUNTIME_MODE_CI {
- if err := plugin.Unregister(); err != nil {
- log.Error("unregister plugin failed, error: %v", err)
- }
- // decrease current connection
- atomic.AddInt32(&s.current_conn, -1)
- }
- }
- // send stopped event
- plugin.wait_chan_lock.Lock()
- for _, c := range plugin.wait_stopped_chan {
- select {
- case c <- true:
- default:
- }
- }
- plugin.wait_chan_lock.Unlock()
- // recycle launched chan, avoid memory leak
- plugin.wait_launched_chan_once.Do(func() {
- close(plugin.wait_launched_chan)
- })
- return gnet.None
- }
- func (s *DifyServer) OnShutdown(c gnet.Engine) {
- close(s.shutdown_chan)
- }
- func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
- codec := c.Context().(*codec)
- messages, err := codec.Decode(c)
- if err != nil {
- return gnet.Close
- }
- // get plugin runtime
- s.plugins_lock.RLock()
- runtime, ok := s.plugins[c.Fd()]
- s.plugins_lock.RUnlock()
- if !ok {
- return gnet.Close
- }
- // handle messages
- for _, message := range messages {
- if len(message) == 0 {
- continue
- }
- s.onMessage(runtime, message)
- }
- return gnet.None
- }
- func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
- // handle message
- if runtime.handshake_failed {
- // do nothing if handshake has failed
- return
- }
- close_conn := func(message []byte) {
- if atomic.CompareAndSwapInt32(&runtime.closed, 0, 1) {
- runtime.conn.Write(message)
- runtime.conn.Close()
- }
- }
- if !runtime.handshake {
- key := string(message)
- info, err := GetConnectionInfo(key)
- if err == cache.ErrNotFound {
- // close connection if handshake failed
- close_conn([]byte("handshake failed, invalid key\n"))
- runtime.handshake_failed = true
- return
- } else if err != nil {
- // close connection if handshake failed
- close_conn([]byte("internal error\n"))
- return
- }
- runtime.tenant_id = info.TenantId
- // handshake completed
- runtime.handshake = true
- } else if !runtime.registration_transferred {
- // process handle shake if not completed
- declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
- if err != nil {
- // close connection if handshake failed
- close_conn([]byte("handshake failed, invalid plugin declaration\n"))
- return
- }
- runtime.Config = declaration
- // registration transferred
- runtime.registration_transferred = true
- } else if !runtime.tools_registration_transferred {
- tools, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ToolProviderDeclaration](message)
- if err != nil {
- log.Error("tools register failed, error: %v", err)
- close_conn([]byte("tools register failed, invalid tools declaration\n"))
- return
- }
- runtime.tools_registration_transferred = true
- if len(tools) > 0 {
- declaration := runtime.Config
- declaration.Tool = &tools[0]
- runtime.Config = declaration
- }
- } else if !runtime.models_registration_transferred {
- models, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ModelProviderDeclaration](message)
- if err != nil {
- log.Error("models register failed, error: %v", err)
- close_conn([]byte("models register failed, invalid models declaration\n"))
- return
- }
- runtime.models_registration_transferred = true
- if len(models) > 0 {
- declaration := runtime.Config
- declaration.Model = &models[0]
- runtime.Config = declaration
- }
- } else if !runtime.endpoints_registration_transferred {
- endpoints, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.EndpointProviderDeclaration](message)
- if err != nil {
- log.Error("endpoints register failed, error: %v", err)
- close_conn([]byte("endpoints register failed, invalid endpoints declaration\n"))
- return
- }
- runtime.endpoints_registration_transferred = true
- if len(endpoints) > 0 {
- declaration := runtime.Config
- declaration.Endpoint = &endpoints[0]
- runtime.Config = declaration
- }
- } else if !runtime.assets_transferred {
- assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
- if err != nil {
- log.Error("assets register failed, error: %v", err)
- close_conn([]byte(fmt.Sprintf("assets register failed, invalid assets declaration: %v\n", err)))
- return
- }
- files := make(map[string][]byte)
- for _, asset := range assets {
- files[asset.Filename], err = hex.DecodeString(asset.Data)
- if err != nil {
- log.Error("assets decode failed, error: %v", err)
- close_conn([]byte(fmt.Sprintf("assets decode failed, invalid assets data, cannot decode file: %v\n", err)))
- return
- }
- }
- // remap assets
- if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
- log.Error("assets remap failed, error: %v", err)
- close_conn([]byte(fmt.Sprintf("assets remap failed, invalid assets data, cannot remap: %v\n", err)))
- return
- }
- atomic.AddInt32(&s.current_conn, 1)
- if atomic.LoadInt32(&s.current_conn) > int32(s.max_conn) {
- close_conn([]byte("server is busy now, please try again later\n"))
- return
- }
- // fill in default values
- runtime.Config.FillInDefaultValues()
- // mark assets transferred
- runtime.assets_transferred = true
- runtime.checksum = runtime.calculateChecksum()
- runtime.InitState()
- runtime.SetActiveAt(time.Now())
- // trigger registration event
- if err := runtime.Register(); err != nil {
- log.Error("register failed, error: %v", err)
- close_conn([]byte("register failed, cannot register\n"))
- return
- }
- // send started event
- runtime.wait_chan_lock.Lock()
- for _, c := range runtime.wait_started_chan {
- select {
- case c <- true:
- default:
- }
- }
- runtime.wait_chan_lock.Unlock()
- // notify launched
- runtime.wait_launched_chan_once.Do(func() {
- close(runtime.wait_launched_chan)
- })
- // publish runtime to watcher
- s.response.Write(runtime)
- } else {
- // continue handle messages if handshake completed
- runtime.response.Write(message)
- }
- }
|