launcher.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path"
  7. "strings"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_runtime"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  13. "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
  14. )
  15. type pluginRuntimeWithDecoder struct {
  16. runtime plugin_entities.PluginRuntime
  17. decoder decoder.PluginDecoder
  18. }
  19. // extract plugin from package to working directory
  20. func (p *PluginManager) getLocalPluginRuntime(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
  21. *pluginRuntimeWithDecoder,
  22. error,
  23. ) {
  24. pluginZip, err := p.installedBucket.Get(pluginUniqueIdentifier)
  25. if err != nil {
  26. return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
  27. }
  28. decoder, err := decoder.NewZipPluginDecoder(pluginZip)
  29. if err != nil {
  30. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  31. }
  32. // get manifest
  33. manifest, err := decoder.Manifest()
  34. if err != nil {
  35. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  36. }
  37. checksum, err := decoder.Checksum()
  38. if err != nil {
  39. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  40. }
  41. identity := manifest.Identity()
  42. identity = strings.ReplaceAll(identity, ":", "-")
  43. pluginWorkingPath := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  44. return &pluginRuntimeWithDecoder{
  45. runtime: plugin_entities.PluginRuntime{
  46. Config: manifest,
  47. State: plugin_entities.PluginRuntimeState{
  48. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  49. Restarts: 0,
  50. ActiveAt: nil,
  51. Verified: manifest.Verified,
  52. WorkingPath: pluginWorkingPath,
  53. },
  54. },
  55. decoder: decoder,
  56. }, nil
  57. }
  58. // launch a local plugin
  59. // returns a full duplex lifetime, a launched channel, an error channel, and an error
  60. // caller should always handle both the channels to avoid deadlock
  61. // 1. for launched channel, launch process will close the channel to notify the caller, just wait for it
  62. // 2. for error channel, it will be closed also, but no more error will be sent, caller should consume all errors
  63. func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
  64. plugin_entities.PluginFullDuplexLifetime, <-chan bool, <-chan error, error,
  65. ) {
  66. plugin, err := p.getLocalPluginRuntime(pluginUniqueIdentifier)
  67. if err != nil {
  68. return nil, nil, nil, err
  69. }
  70. identity, err := plugin.decoder.UniqueIdentity()
  71. if err != nil {
  72. return nil, nil, nil, err
  73. }
  74. // lock launch process
  75. p.localPluginLaunchingLock.Lock(identity.String())
  76. defer p.localPluginLaunchingLock.Unlock(identity.String())
  77. // check if the plugin is already running
  78. if lifetime, ok := p.m.Load(identity.String()); ok {
  79. lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
  80. if !ok {
  81. return nil, nil, nil, fmt.Errorf("plugin runtime not found")
  82. }
  83. // returns a closed channel to indicate the plugin is already running, no more waiting is needed
  84. c := make(chan bool)
  85. close(c)
  86. errChan := make(chan error)
  87. close(errChan)
  88. return lifetime, c, errChan, nil
  89. }
  90. // extract plugin
  91. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  92. if !ok {
  93. return nil, nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
  94. }
  95. // check if the working directory exists, if not, create it, otherwise, launch it directly
  96. if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
  97. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  98. return nil, nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  99. }
  100. }
  101. success := false
  102. failed := func(message string) error {
  103. if !success {
  104. os.RemoveAll(plugin.runtime.State.WorkingPath)
  105. }
  106. return errors.New(message)
  107. }
  108. // get assets
  109. assets, err := plugin.decoder.Assets()
  110. if err != nil {
  111. return nil, nil, nil, failed(err.Error())
  112. }
  113. localPluginRuntime := local_runtime.NewLocalPluginRuntime(local_runtime.LocalPluginRuntimeConfig{
  114. PythonInterpreterPath: p.pythonInterpreterPath,
  115. PythonEnvInitTimeout: p.pythonEnvInitTimeout,
  116. HttpProxy: p.HttpProxy,
  117. HttpsProxy: p.HttpsProxy,
  118. PipMirrorUrl: p.pipMirrorUrl,
  119. PipPreferBinary: p.pipPreferBinary,
  120. PipExtraArgs: p.pipExtraArgs,
  121. })
  122. localPluginRuntime.PluginRuntime = plugin.runtime
  123. localPluginRuntime.BasicChecksum = basic_runtime.BasicChecksum{
  124. MediaTransport: basic_runtime.NewMediaTransport(p.mediaBucket),
  125. WorkingPath: plugin.runtime.State.WorkingPath,
  126. Decoder: plugin.decoder,
  127. }
  128. if err := localPluginRuntime.RemapAssets(
  129. &localPluginRuntime.Config,
  130. assets,
  131. ); err != nil {
  132. return nil, nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  133. }
  134. success = true
  135. p.m.Store(identity.String(), localPluginRuntime)
  136. // NOTE: you should always keep the size of the channel to 0
  137. // we use this to synchronize the plugin launch process
  138. launchedChan := make(chan bool)
  139. errChan := make(chan error)
  140. // local plugin
  141. routine.Submit(map[string]string{
  142. "module": "plugin_manager",
  143. "function": "LaunchLocal",
  144. }, func() {
  145. defer func() {
  146. if r := recover(); r != nil {
  147. log.Error("plugin runtime panic: %v", r)
  148. }
  149. p.m.Delete(identity.String())
  150. }()
  151. // add max launching lock to prevent too many plugins launching at the same time
  152. p.maxLaunchingLock <- true
  153. routine.Submit(map[string]string{
  154. "module": "plugin_manager",
  155. "function": "LaunchLocal",
  156. }, func() {
  157. // wait for plugin launched
  158. <-launchedChan
  159. // release max launching lock
  160. <-p.maxLaunchingLock
  161. })
  162. p.fullDuplexLifecycle(localPluginRuntime, launchedChan, errChan)
  163. })
  164. return localPluginRuntime, launchedChan, errChan, nil
  165. }