watcher.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path"
  7. "strings"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  18. )
  19. func (p *PluginManager) startLocalWatcher() {
  20. go func() {
  21. log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
  22. p.handleNewLocalPlugins()
  23. for range time.NewTicker(time.Second * 30).C {
  24. p.handleNewLocalPlugins()
  25. p.removeUninstalledLocalPlugins()
  26. }
  27. }()
  28. }
  29. func (p *PluginManager) initRemotePluginServer(config *app.Config) {
  30. if p.remotePluginServer != nil {
  31. return
  32. }
  33. p.remotePluginServer = remote_manager.NewRemotePluginServer(config, p.mediaBucket)
  34. }
  35. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  36. // launch TCP debugging server if enabled
  37. if config.PluginRemoteInstallingEnabled {
  38. p.initRemotePluginServer(config)
  39. go func() {
  40. err := p.remotePluginServer.Launch()
  41. if err != nil {
  42. log.Error("start remote plugin server failed: %s", err.Error())
  43. }
  44. }()
  45. go func() {
  46. p.remotePluginServer.Wrap(func(rpr plugin_entities.PluginFullDuplexLifetime) {
  47. identity, err := rpr.Identity()
  48. if err != nil {
  49. log.Error("get remote plugin identity failed: %s", err.Error())
  50. return
  51. }
  52. p.m.Store(identity.String(), rpr)
  53. routine.Submit(func() {
  54. defer func() {
  55. if err := recover(); err != nil {
  56. log.Error("plugin runtime error: %v", err)
  57. }
  58. p.m.Delete(identity.String())
  59. }()
  60. p.fullDuplexLifecycle(rpr, nil)
  61. })
  62. })
  63. }()
  64. }
  65. }
  66. func (p *PluginManager) handleNewLocalPlugins() {
  67. // walk through all plugins
  68. plugins, err := p.installedBucket.List()
  69. if err != nil {
  70. log.Error("list installed plugins failed: %s", err.Error())
  71. return
  72. }
  73. for _, plugin := range plugins {
  74. _, _, err := p.launchLocal(plugin)
  75. if err != nil {
  76. log.Error("launch local plugin failed: %s", err.Error())
  77. }
  78. }
  79. }
  80. // an async function to remove uninstalled local plugins
  81. func (p *PluginManager) removeUninstalledLocalPlugins() {
  82. // read all local plugin runtimes
  83. p.m.Range(func(key string, value plugin_entities.PluginLifetime) bool {
  84. // try to convert to local runtime
  85. runtime, ok := value.(*local_manager.LocalPluginRuntime)
  86. if !ok {
  87. return true
  88. }
  89. plugin_unique_identifier, err := runtime.Identity()
  90. if err != nil {
  91. log.Error("get plugin identity failed: %s", err.Error())
  92. return true
  93. }
  94. // check if plugin is deleted, stop it if so
  95. exists, err := p.installedBucket.Exists(plugin_unique_identifier)
  96. if err != nil {
  97. log.Error("check if plugin is deleted failed: %s", err.Error())
  98. return true
  99. }
  100. if !exists {
  101. runtime.Stop()
  102. }
  103. return true
  104. })
  105. }
  106. func (p *PluginManager) launchLocal(plugin_unique_identifier plugin_entities.PluginUniqueIdentifier) (
  107. plugin_entities.PluginFullDuplexLifetime, <-chan error, error,
  108. ) {
  109. plugin, err := p.getLocalPluginRuntime(plugin_unique_identifier)
  110. if err != nil {
  111. return nil, nil, err
  112. }
  113. identity, err := plugin.decoder.UniqueIdentity()
  114. if err != nil {
  115. return nil, nil, err
  116. }
  117. // lock launch process
  118. p.localPluginLaunchingLock.Lock(identity.String())
  119. defer p.localPluginLaunchingLock.Unlock(identity.String())
  120. // check if the plugin is already running
  121. if lifetime, ok := p.m.Load(identity.String()); ok {
  122. lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
  123. if !ok {
  124. return nil, nil, fmt.Errorf("plugin runtime not found")
  125. }
  126. // returns a closed channel to indicate the plugin is already running, no more waiting is needed
  127. c := make(chan error)
  128. close(c)
  129. return lifetime, c, nil
  130. }
  131. // extract plugin
  132. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  133. if !ok {
  134. return nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
  135. }
  136. // check if the working directory exists, if not, create it, otherwise, launch it directly
  137. if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
  138. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  139. return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  140. }
  141. }
  142. success := false
  143. failed := func(message string) error {
  144. if !success {
  145. os.RemoveAll(plugin.runtime.State.WorkingPath)
  146. }
  147. return errors.New(message)
  148. }
  149. // get assets
  150. assets, err := plugin.decoder.Assets()
  151. if err != nil {
  152. return nil, nil, failed(err.Error())
  153. }
  154. local_plugin_runtime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
  155. local_plugin_runtime.PluginRuntime = plugin.runtime
  156. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  157. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaBucket),
  158. WorkingPath: plugin.runtime.State.WorkingPath,
  159. Decoder: plugin.decoder,
  160. }
  161. if err := local_plugin_runtime.RemapAssets(
  162. &local_plugin_runtime.Config,
  163. assets,
  164. ); err != nil {
  165. return nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  166. }
  167. success = true
  168. p.m.Store(identity.String(), local_plugin_runtime)
  169. launched_chan := make(chan error)
  170. // local plugin
  171. routine.Submit(func() {
  172. defer func() {
  173. if r := recover(); r != nil {
  174. log.Error("plugin runtime panic: %v", r)
  175. }
  176. p.m.Delete(identity.String())
  177. }()
  178. // add max launching lock to prevent too many plugins launching at the same time
  179. p.maxLaunchingLock <- true
  180. routine.Submit(func() {
  181. // wait for plugin launched
  182. <-launched_chan
  183. // release max launching lock
  184. <-p.maxLaunchingLock
  185. })
  186. p.fullDuplexLifecycle(local_plugin_runtime, launched_chan)
  187. })
  188. return local_plugin_runtime, launched_chan, nil
  189. }
  190. type pluginRuntimeWithDecoder struct {
  191. runtime plugin_entities.PluginRuntime
  192. decoder decoder.PluginDecoder
  193. }
  194. // extract plugin from package to working directory
  195. func (p *PluginManager) getLocalPluginRuntime(plugin_unique_identifier plugin_entities.PluginUniqueIdentifier) (
  196. *pluginRuntimeWithDecoder,
  197. error,
  198. ) {
  199. plugin_zip, err := p.installedBucket.Get(plugin_unique_identifier)
  200. if err != nil {
  201. return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
  202. }
  203. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  204. if err != nil {
  205. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  206. }
  207. // get manifest
  208. manifest, err := decoder.Manifest()
  209. if err != nil {
  210. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  211. }
  212. checksum, err := decoder.Checksum()
  213. if err != nil {
  214. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  215. }
  216. identity := manifest.Identity()
  217. identity = strings.ReplaceAll(identity, ":", "-")
  218. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  219. return &pluginRuntimeWithDecoder{
  220. runtime: plugin_entities.PluginRuntime{
  221. Config: manifest,
  222. State: plugin_entities.PluginRuntimeState{
  223. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  224. Restarts: 0,
  225. ActiveAt: nil,
  226. Verified: manifest.Verified,
  227. WorkingPath: plugin_working_path,
  228. },
  229. },
  230. decoder: decoder,
  231. }, nil
  232. }