watcher.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/fs"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  21. )
  22. func (p *PluginManager) startLocalWatcher() {
  23. go func() {
  24. log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
  25. p.handleNewLocalPlugins()
  26. for range time.NewTicker(time.Second * 30).C {
  27. p.handleNewLocalPlugins()
  28. p.removeUninstalledLocalPlugins()
  29. }
  30. }()
  31. }
  32. func (p *PluginManager) initRemotePluginServer(config *app.Config) {
  33. if p.remotePluginServer != nil {
  34. return
  35. }
  36. p.remotePluginServer = remote_manager.NewRemotePluginServer(config, p.mediaManager)
  37. }
  38. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  39. // launch TCP debugging server if enabled
  40. if config.PluginRemoteInstallingEnabled {
  41. p.initRemotePluginServer(config)
  42. go func() {
  43. err := p.remotePluginServer.Launch()
  44. if err != nil {
  45. log.Error("start remote plugin server failed: %s", err.Error())
  46. }
  47. }()
  48. go func() {
  49. p.remotePluginServer.Wrap(func(rpr plugin_entities.PluginFullDuplexLifetime) {
  50. identity, err := rpr.Identity()
  51. if err != nil {
  52. log.Error("get remote plugin identity failed: %s", err.Error())
  53. return
  54. }
  55. p.m.Store(identity.String(), rpr)
  56. routine.Submit(func() {
  57. defer func() {
  58. if err := recover(); err != nil {
  59. log.Error("plugin runtime error: %v", err)
  60. }
  61. p.m.Delete(identity.String())
  62. }()
  63. p.fullDuplexLifecycle(rpr, nil)
  64. })
  65. })
  66. }()
  67. }
  68. }
  69. func (p *PluginManager) handleNewLocalPlugins() {
  70. // walk through all plugins
  71. err := filepath.WalkDir(p.pluginStoragePath, func(path string, d fs.DirEntry, err error) error {
  72. if err != nil {
  73. return err
  74. }
  75. if !d.IsDir() {
  76. _, _, err := p.launchLocal(path)
  77. if err != nil {
  78. log.Error("launch local plugin failed: %s", err.Error())
  79. }
  80. }
  81. return nil
  82. })
  83. if err != nil {
  84. log.Error("walk through plugins failed: %s", err.Error())
  85. }
  86. }
  87. // an async function to remove uninstalled local plugins
  88. func (p *PluginManager) removeUninstalledLocalPlugins() {
  89. // read all local plugin runtimes
  90. p.m.Range(func(key string, value plugin_entities.PluginLifetime) bool {
  91. // try to convert to local runtime
  92. runtime, ok := value.(*local_manager.LocalPluginRuntime)
  93. if !ok {
  94. return true
  95. }
  96. // check if plugin is deleted, stop it if so
  97. plugin_path := filepath.Join(p.pluginStoragePath, key)
  98. if _, err := os.Stat(plugin_path); os.IsNotExist(err) {
  99. runtime.Stop()
  100. }
  101. return true
  102. })
  103. }
  104. func (p *PluginManager) launchLocal(plugin_package_path string) (
  105. plugin_entities.PluginFullDuplexLifetime, <-chan error, error,
  106. ) {
  107. plugin, err := p.getLocalPluginRuntime(plugin_package_path)
  108. if err != nil {
  109. return nil, nil, err
  110. }
  111. identity, err := plugin.decoder.UniqueIdentity()
  112. if err != nil {
  113. return nil, nil, err
  114. }
  115. // lock launch process
  116. p.localPluginLaunchingLock.Lock(identity.String())
  117. defer p.localPluginLaunchingLock.Unlock(identity.String())
  118. // check if the plugin is already running
  119. if lifetime, ok := p.m.Load(identity.String()); ok {
  120. lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
  121. if !ok {
  122. return nil, nil, fmt.Errorf("plugin runtime not found")
  123. }
  124. // returns a closed channel to indicate the plugin is already running, no more waiting is needed
  125. c := make(chan error)
  126. close(c)
  127. return lifetime, c, nil
  128. }
  129. // extract plugin
  130. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  131. if !ok {
  132. return nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
  133. }
  134. // check if the working directory exists, if not, create it, otherwise, launch it directly
  135. if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
  136. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  137. return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  138. }
  139. }
  140. success := false
  141. failed := func(message string) error {
  142. if !success {
  143. os.RemoveAll(plugin.runtime.State.WorkingPath)
  144. }
  145. return errors.New(message)
  146. }
  147. // get assets
  148. assets, err := plugin.decoder.Assets()
  149. if err != nil {
  150. return nil, nil, failed(err.Error())
  151. }
  152. local_plugin_runtime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
  153. local_plugin_runtime.PluginRuntime = plugin.runtime
  154. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  155. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  156. LocalPackagePath: plugin.runtime.State.AbsolutePath,
  157. WorkingPath: plugin.runtime.State.WorkingPath,
  158. Decoder: plugin.decoder,
  159. }
  160. if err := local_plugin_runtime.RemapAssets(
  161. &local_plugin_runtime.Config,
  162. assets,
  163. ); err != nil {
  164. return nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  165. }
  166. success = true
  167. p.m.Store(identity.String(), local_plugin_runtime)
  168. launched_chan := make(chan error)
  169. // local plugin
  170. routine.Submit(func() {
  171. defer func() {
  172. if r := recover(); r != nil {
  173. log.Error("plugin runtime panic: %v", r)
  174. }
  175. p.m.Delete(identity.String())
  176. }()
  177. // add max launching lock to prevent too many plugins launching at the same time
  178. p.maxLaunchingLock <- true
  179. routine.Submit(func() {
  180. // wait for plugin launched
  181. <-launched_chan
  182. // release max launching lock
  183. <-p.maxLaunchingLock
  184. })
  185. p.fullDuplexLifecycle(local_plugin_runtime, launched_chan)
  186. })
  187. return local_plugin_runtime, launched_chan, nil
  188. }
  189. type pluginRuntimeWithDecoder struct {
  190. runtime plugin_entities.PluginRuntime
  191. decoder decoder.PluginDecoder
  192. }
  193. // extract plugin from package to working directory
  194. func (p *PluginManager) getLocalPluginRuntime(plugin_path string) (
  195. *pluginRuntimeWithDecoder,
  196. error,
  197. ) {
  198. pack, err := os.Open(plugin_path)
  199. if err != nil {
  200. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  201. }
  202. defer pack.Close()
  203. if info, err := pack.Stat(); err != nil {
  204. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  205. } else if info.Size() > p.maxPluginPackageSize {
  206. log.Error("plugin package size is too large: %d", info.Size())
  207. return nil, errors.Join(err, fmt.Errorf("plugin package size is too large"))
  208. }
  209. plugin_zip, err := io.ReadAll(pack)
  210. if err != nil {
  211. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  212. }
  213. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  214. if err != nil {
  215. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  216. }
  217. // get manifest
  218. manifest, err := decoder.Manifest()
  219. if err != nil {
  220. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  221. }
  222. checksum, err := decoder.Checksum()
  223. if err != nil {
  224. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  225. }
  226. identity := manifest.Identity()
  227. identity = strings.ReplaceAll(identity, ":", "-")
  228. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  229. return &pluginRuntimeWithDecoder{
  230. runtime: plugin_entities.PluginRuntime{
  231. Config: manifest,
  232. State: plugin_entities.PluginRuntimeState{
  233. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  234. Restarts: 0,
  235. AbsolutePath: plugin_path,
  236. WorkingPath: plugin_working_path,
  237. ActiveAt: nil,
  238. Verified: manifest.Verified,
  239. },
  240. },
  241. decoder: decoder,
  242. }, nil
  243. }