123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- package plugin_manager
- import (
- "errors"
- "fmt"
- "os"
- "path"
- "strings"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_runtime"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
- )
- type pluginRuntimeWithDecoder struct {
- runtime plugin_entities.PluginRuntime
- decoder decoder.PluginDecoder
- }
- // extract plugin from package to working directory
- func (p *PluginManager) getLocalPluginRuntime(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
- *pluginRuntimeWithDecoder,
- error,
- ) {
- pluginZip, err := p.installedBucket.Get(pluginUniqueIdentifier)
- if err != nil {
- return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
- }
- decoder, err := decoder.NewZipPluginDecoder(pluginZip)
- if err != nil {
- return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
- }
- // get manifest
- manifest, err := decoder.Manifest()
- if err != nil {
- return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
- }
- checksum, err := decoder.Checksum()
- if err != nil {
- return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
- }
- identity := manifest.Identity()
- identity = strings.ReplaceAll(identity, ":", "-")
- pluginWorkingPath := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
- return &pluginRuntimeWithDecoder{
- runtime: plugin_entities.PluginRuntime{
- Config: manifest,
- State: plugin_entities.PluginRuntimeState{
- Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
- Restarts: 0,
- ActiveAt: nil,
- Verified: manifest.Verified,
- WorkingPath: pluginWorkingPath,
- },
- },
- decoder: decoder,
- }, nil
- }
- // launch a local plugin
- // returns a full duplex lifetime, a launched channel, an error channel, and an error
- // caller should always handle both the channels to avoid deadlock
- // 1. for launched channel, launch process will close the channel to notify the caller, just wait for it
- // 2. for error channel, it will be closed also, but no more error will be sent, caller should consume all errors
- func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
- plugin_entities.PluginFullDuplexLifetime, <-chan bool, <-chan error, error,
- ) {
- plugin, err := p.getLocalPluginRuntime(pluginUniqueIdentifier)
- if err != nil {
- return nil, nil, nil, err
- }
- identity, err := plugin.decoder.UniqueIdentity()
- if err != nil {
- return nil, nil, nil, err
- }
- // lock launch process
- p.localPluginLaunchingLock.Lock(identity.String())
- defer p.localPluginLaunchingLock.Unlock(identity.String())
- // check if the plugin is already running
- if lifetime, ok := p.m.Load(identity.String()); ok {
- lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
- if !ok {
- return nil, nil, nil, fmt.Errorf("plugin runtime not found")
- }
- // returns a closed channel to indicate the plugin is already running, no more waiting is needed
- c := make(chan bool)
- close(c)
- errChan := make(chan error)
- close(errChan)
- return lifetime, c, errChan, nil
- }
- // extract plugin
- decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
- if !ok {
- return nil, nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
- }
- // check if the working directory exists, if not, create it, otherwise, launch it directly
- if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
- if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
- return nil, nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
- }
- }
- success := false
- failed := func(message string) error {
- if !success {
- os.RemoveAll(plugin.runtime.State.WorkingPath)
- }
- return errors.New(message)
- }
- // get assets
- assets, err := plugin.decoder.Assets()
- if err != nil {
- return nil, nil, nil, failed(err.Error())
- }
- localPluginRuntime := local_runtime.NewLocalPluginRuntime(local_runtime.LocalPluginRuntimeConfig{
- PythonInterpreterPath: p.pythonInterpreterPath,
- PythonEnvInitTimeout: p.pythonEnvInitTimeout,
- HttpProxy: p.HttpProxy,
- HttpsProxy: p.HttpsProxy,
- PipMirrorUrl: p.pipMirrorUrl,
- PipPreferBinary: p.pipPreferBinary,
- PipExtraArgs: p.pipExtraArgs,
- })
- localPluginRuntime.PluginRuntime = plugin.runtime
- localPluginRuntime.BasicChecksum = basic_runtime.BasicChecksum{
- MediaTransport: basic_runtime.NewMediaTransport(p.mediaBucket),
- WorkingPath: plugin.runtime.State.WorkingPath,
- Decoder: plugin.decoder,
- }
- if err := localPluginRuntime.RemapAssets(
- &localPluginRuntime.Config,
- assets,
- ); err != nil {
- return nil, nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
- }
- success = true
- p.m.Store(identity.String(), localPluginRuntime)
- // NOTE: you should always keep the size of the channel to 0
- // we use this to synchronize the plugin launch process
- launchedChan := make(chan bool)
- errChan := make(chan error)
- // local plugin
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "function": "LaunchLocal",
- }, func() {
- defer func() {
- if r := recover(); r != nil {
- log.Error("plugin runtime panic: %v", r)
- }
- p.m.Delete(identity.String())
- }()
- // add max launching lock to prevent too many plugins launching at the same time
- p.maxLaunchingLock <- true
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "function": "LaunchLocal",
- }, func() {
- // wait for plugin launched
- <-launchedChan
- // release max launching lock
- <-p.maxLaunchingLock
- })
- p.fullDuplexLifecycle(localPluginRuntime, launchedChan, errChan)
- })
- return localPluginRuntime, launchedChan, errChan, nil
- }
|