launcher.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path"
  7. "strings"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. )
  16. type pluginRuntimeWithDecoder struct {
  17. runtime plugin_entities.PluginRuntime
  18. decoder decoder.PluginDecoder
  19. }
  20. // extract plugin from package to working directory
  21. func (p *PluginManager) getLocalPluginRuntime(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
  22. *pluginRuntimeWithDecoder,
  23. error,
  24. ) {
  25. pluginZip, err := p.installedBucket.Get(pluginUniqueIdentifier)
  26. if err != nil {
  27. return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
  28. }
  29. decoder, err := decoder.NewZipPluginDecoder(pluginZip)
  30. if err != nil {
  31. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  32. }
  33. // get manifest
  34. manifest, err := decoder.Manifest()
  35. if err != nil {
  36. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  37. }
  38. checksum, err := decoder.Checksum()
  39. if err != nil {
  40. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  41. }
  42. identity := manifest.Identity()
  43. identity = strings.ReplaceAll(identity, ":", "-")
  44. pluginWorkingPath := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  45. return &pluginRuntimeWithDecoder{
  46. runtime: plugin_entities.PluginRuntime{
  47. Config: manifest,
  48. State: plugin_entities.PluginRuntimeState{
  49. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  50. Restarts: 0,
  51. ActiveAt: nil,
  52. Verified: manifest.Verified,
  53. WorkingPath: pluginWorkingPath,
  54. },
  55. },
  56. decoder: decoder,
  57. }, nil
  58. }
  59. // launch a local plugin
  60. // returns a full duplex lifetime, a launched channel, an error channel, and an error
  61. // caller should always handle both the channels to avoid deadlock
  62. // 1. for launched channel, launch process will close the channel to notify the caller, just wait for it
  63. // 2. for error channel, it will be closed also, but no more error will be sent, caller should consume all errors
  64. func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
  65. plugin_entities.PluginFullDuplexLifetime, <-chan bool, <-chan error, error,
  66. ) {
  67. plugin, err := p.getLocalPluginRuntime(pluginUniqueIdentifier)
  68. if err != nil {
  69. return nil, nil, nil, err
  70. }
  71. identity, err := plugin.decoder.UniqueIdentity()
  72. if err != nil {
  73. return nil, nil, nil, err
  74. }
  75. // lock launch process
  76. p.localPluginLaunchingLock.Lock(identity.String())
  77. defer p.localPluginLaunchingLock.Unlock(identity.String())
  78. // check if the plugin is already running
  79. if lifetime, ok := p.m.Load(identity.String()); ok {
  80. lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
  81. if !ok {
  82. return nil, nil, nil, fmt.Errorf("plugin runtime not found")
  83. }
  84. // returns a closed channel to indicate the plugin is already running, no more waiting is needed
  85. c := make(chan bool)
  86. close(c)
  87. errChan := make(chan error)
  88. close(errChan)
  89. return lifetime, c, errChan, nil
  90. }
  91. // extract plugin
  92. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  93. if !ok {
  94. return nil, nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
  95. }
  96. // check if the working directory exists, if not, create it, otherwise, launch it directly
  97. if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
  98. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  99. return nil, nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  100. }
  101. }
  102. success := false
  103. failed := func(message string) error {
  104. if !success {
  105. os.RemoveAll(plugin.runtime.State.WorkingPath)
  106. }
  107. return errors.New(message)
  108. }
  109. // get assets
  110. assets, err := plugin.decoder.Assets()
  111. if err != nil {
  112. return nil, nil, nil, failed(err.Error())
  113. }
  114. localPluginRuntime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
  115. localPluginRuntime.PluginRuntime = plugin.runtime
  116. localPluginRuntime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  117. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaBucket),
  118. WorkingPath: plugin.runtime.State.WorkingPath,
  119. Decoder: plugin.decoder,
  120. }
  121. if err := localPluginRuntime.RemapAssets(
  122. &localPluginRuntime.Config,
  123. assets,
  124. ); err != nil {
  125. return nil, nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  126. }
  127. success = true
  128. p.m.Store(identity.String(), localPluginRuntime)
  129. // NOTE: you should always keep the size of the channel to 0
  130. // we use this to synchronize the plugin launch process
  131. launchedChan := make(chan bool)
  132. errChan := make(chan error)
  133. // local plugin
  134. routine.Submit(func() {
  135. defer func() {
  136. if r := recover(); r != nil {
  137. log.Error("plugin runtime panic: %v", r)
  138. }
  139. p.m.Delete(identity.String())
  140. }()
  141. // add max launching lock to prevent too many plugins launching at the same time
  142. p.maxLaunchingLock <- true
  143. routine.Submit(func() {
  144. // wait for plugin launched
  145. <-launchedChan
  146. // release max launching lock
  147. <-p.maxLaunchingLock
  148. })
  149. p.fullDuplexLifecycle(localPluginRuntime, launchedChan, errChan)
  150. })
  151. return localPluginRuntime, launchedChan, errChan, nil
  152. }