123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package plugin_manager
- import (
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/debugging_runtime"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_runtime"
- "github.com/langgenius/dify-plugin-daemon/internal/types/app"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
- )
- func (p *PluginManager) startLocalWatcher() {
- go func() {
- log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
- p.handleNewLocalPlugins()
- for range time.NewTicker(time.Second * 30).C {
- p.handleNewLocalPlugins()
- p.removeUninstalledLocalPlugins()
- }
- }()
- }
- func (p *PluginManager) initRemotePluginServer(config *app.Config) {
- if p.remotePluginServer != nil {
- return
- }
- p.remotePluginServer = debugging_runtime.NewRemotePluginServer(config, p.mediaBucket)
- }
- func (p *PluginManager) startRemoteWatcher(config *app.Config) {
- // launch TCP debugging server if enabled
- if config.PluginRemoteInstallingEnabled != nil && *config.PluginRemoteInstallingEnabled {
- p.initRemotePluginServer(config)
- go func() {
- err := p.remotePluginServer.Launch()
- if err != nil {
- log.Error("start remote plugin server failed: %s", err.Error())
- }
- }()
- go func() {
- p.remotePluginServer.Wrap(func(rpr plugin_entities.PluginFullDuplexLifetime) {
- identity, err := rpr.Identity()
- if err != nil {
- log.Error("get remote plugin identity failed: %s", err.Error())
- return
- }
- p.m.Store(identity.String(), rpr)
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "function": "startRemoteWatcher",
- "plugin_id": identity.String(),
- "type": "remote",
- }, func() {
- defer func() {
- if err := recover(); err != nil {
- log.Error("plugin runtime error: %v", err)
- }
- p.m.Delete(identity.String())
- }()
- p.fullDuplexLifecycle(rpr, nil, nil)
- })
- })
- }()
- }
- }
- func (p *PluginManager) handleNewLocalPlugins() {
- // walk through all plugins
- plugins, err := p.installedBucket.List()
- if err != nil {
- log.Error("list installed plugins failed: %s", err.Error())
- return
- }
- for _, plugin := range plugins {
- _, launchedChan, errChan, err := p.launchLocal(plugin)
- if err != nil {
- log.Error("launch local plugin failed: %s", err.Error())
- }
- // consume error, avoid deadlock
- for err := range errChan {
- log.Error("plugin launch error: %s", err.Error())
- }
- // wait for plugin launched
- <-launchedChan
- }
- }
- // an async function to remove uninstalled local plugins
- func (p *PluginManager) removeUninstalledLocalPlugins() {
- // read all local plugin runtimes
- p.m.Range(func(key string, value plugin_entities.PluginLifetime) bool {
- // try to convert to local runtime
- runtime, ok := value.(*local_runtime.LocalPluginRuntime)
- if !ok {
- return true
- }
- pluginUniqueIdentifier, err := runtime.Identity()
- if err != nil {
- log.Error("get plugin identity failed: %s", err.Error())
- return true
- }
- // check if plugin is deleted, stop it if so
- exists, err := p.installedBucket.Exists(pluginUniqueIdentifier)
- if err != nil {
- log.Error("check if plugin is deleted failed: %s", err.Error())
- return true
- }
- if !exists {
- runtime.Stop()
- }
- return true
- })
- }
|