| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 | package plugin_managerimport (	"time"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/debugging_runtime"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_runtime"	"github.com/langgenius/dify-plugin-daemon/internal/types/app"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"	"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities")func (p *PluginManager) startLocalWatcher() {	go func() {		log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)		p.handleNewLocalPlugins()		for range time.NewTicker(time.Second * 30).C {			p.handleNewLocalPlugins()			p.removeUninstalledLocalPlugins()		}	}()}func (p *PluginManager) initRemotePluginServer(config *app.Config) {	if p.remotePluginServer != nil {		return	}	p.remotePluginServer = debugging_runtime.NewRemotePluginServer(config, p.mediaBucket)}func (p *PluginManager) startRemoteWatcher(config *app.Config) {	// launch TCP debugging server if enabled	if config.PluginRemoteInstallingEnabled != nil && *config.PluginRemoteInstallingEnabled {		p.initRemotePluginServer(config)		go func() {			err := p.remotePluginServer.Launch()			if err != nil {				log.Error("start remote plugin server failed: %s", err.Error())			}		}()		go func() {			p.remotePluginServer.Wrap(func(rpr plugin_entities.PluginFullDuplexLifetime) {				identity, err := rpr.Identity()				if err != nil {					log.Error("get remote plugin identity failed: %s", err.Error())					return				}				p.m.Store(identity.String(), rpr)				routine.Submit(map[string]string{					"module":    "plugin_manager",					"function":  "startRemoteWatcher",					"plugin_id": identity.String(),					"type":      "remote",				}, func() {					defer func() {						if err := recover(); err != nil {							log.Error("plugin runtime error: %v", err)						}						p.m.Delete(identity.String())					}()					p.fullDuplexLifecycle(rpr, nil, nil)				})			})		}()	}}func (p *PluginManager) handleNewLocalPlugins() {	// walk through all plugins	plugins, err := p.installedBucket.List()	if err != nil {		log.Error("list installed plugins failed: %s", err.Error())		return	}	for _, plugin := range plugins {		_, launchedChan, errChan, err := p.launchLocal(plugin)		if err != nil {			log.Error("launch local plugin failed: %s", err.Error())		}		// consume error, avoid deadlock		for err := range errChan {			log.Error("plugin launch error: %s", err.Error())		}		// wait for plugin launched		<-launchedChan	}}// an async function to remove uninstalled local pluginsfunc (p *PluginManager) removeUninstalledLocalPlugins() {	// read all local plugin runtimes	p.m.Range(func(key string, value plugin_entities.PluginLifetime) bool {		// try to convert to local runtime		runtime, ok := value.(*local_runtime.LocalPluginRuntime)		if !ok {			return true		}		pluginUniqueIdentifier, err := runtime.Identity()		if err != nil {			log.Error("get plugin identity failed: %s", err.Error())			return true		}		// check if plugin is deleted, stop it if so		exists, err := p.installedBucket.Exists(pluginUniqueIdentifier)		if err != nil {			log.Error("check if plugin is deleted failed: %s", err.Error())			return true		}		if !exists {			runtime.Stop()		}		return true	})}
 |