watcher.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  19. )
  20. func (p *PluginManager) startLocalWatcher(config *app.Config) {
  21. go func() {
  22. log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
  23. p.handleNewLocalPlugins(config)
  24. for range time.NewTicker(time.Second * 30).C {
  25. p.handleNewLocalPlugins(config)
  26. }
  27. }()
  28. }
  29. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  30. // launch TCP debugging server if enabled
  31. if config.PluginRemoteInstallingEnabled {
  32. server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
  33. go func() {
  34. err := server.Launch()
  35. if err != nil {
  36. log.Error("start remote plugin server failed: %s", err.Error())
  37. }
  38. }()
  39. go func() {
  40. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  41. routine.Submit(func() {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. log.Error("plugin runtime error: %v", err)
  45. }
  46. }()
  47. p.fullDuplexLifetime(rpr)
  48. })
  49. })
  50. }()
  51. }
  52. }
  53. func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
  54. // load local plugins firstly
  55. for plugin := range p.loadNewLocalPlugins(config.PluginStoragePath) {
  56. // get assets
  57. assets, err := plugin.Decoder.Assets()
  58. if err != nil {
  59. log.Error("get plugin assets error: %v", err)
  60. continue
  61. }
  62. local_plugin_runtime := local_manager.NewLocalPluginRuntime()
  63. local_plugin_runtime.PluginRuntime = plugin.Runtime
  64. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  65. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  66. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  67. WorkingPath: plugin.Runtime.State.WorkingPath,
  68. Decoder: plugin.Decoder,
  69. }
  70. if err := local_plugin_runtime.RemapAssets(
  71. &local_plugin_runtime.Config,
  72. assets,
  73. ); err != nil {
  74. log.Error("remap plugin assets error: %v", err)
  75. continue
  76. }
  77. identity, err := local_plugin_runtime.Identity()
  78. if err != nil {
  79. log.Error("get plugin identity error: %v", err)
  80. continue
  81. }
  82. // store the plugin in the storage, avoid duplicate loading
  83. p.runningPluginInStorage.Store(plugin.Runtime.State.AbsolutePath, identity.String())
  84. // local plugin
  85. routine.Submit(func() {
  86. defer func() {
  87. if r := recover(); r != nil {
  88. log.Error("plugin runtime error: %v", r)
  89. }
  90. }()
  91. // delete the plugin from the storage when the plugin is stopped
  92. defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
  93. p.fullDuplexLifetime(local_plugin_runtime)
  94. })
  95. }
  96. }
  97. type pluginRuntimeWithDecoder struct {
  98. Runtime plugin_entities.PluginRuntime
  99. Decoder decoder.PluginDecoder
  100. }
  101. // chan should be closed after using that
  102. func (p *PluginManager) loadNewLocalPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
  103. ch := make(chan *pluginRuntimeWithDecoder)
  104. plugins, err := os.ReadDir(root_path)
  105. if err != nil {
  106. log.Error("no plugin found in path: %s", root_path)
  107. close(ch)
  108. return ch
  109. }
  110. routine.Submit(func() {
  111. for _, plugin := range plugins {
  112. if !plugin.IsDir() {
  113. abs_path := path.Join(root_path, plugin.Name())
  114. if _, ok := p.runningPluginInStorage.Load(abs_path); ok {
  115. // if the plugin is already running, skip it
  116. continue
  117. }
  118. plugin, err := p.loadPlugin(abs_path)
  119. if err != nil {
  120. log.Error("load plugin error: %v", err)
  121. continue
  122. }
  123. ch <- plugin
  124. }
  125. }
  126. close(ch)
  127. })
  128. return ch
  129. }
  130. func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  131. pack, err := os.Open(plugin_path)
  132. if err != nil {
  133. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  134. }
  135. defer pack.Close()
  136. if info, err := pack.Stat(); err != nil {
  137. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  138. } else if info.Size() > p.maxPluginPackageSize {
  139. log.Error("plugin package size is too large: %d", info.Size())
  140. return nil, err
  141. }
  142. plugin_zip, err := io.ReadAll(pack)
  143. if err != nil {
  144. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  145. }
  146. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  147. if err != nil {
  148. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  149. }
  150. // get manifest
  151. manifest, err := decoder.Manifest()
  152. if err != nil {
  153. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  154. }
  155. // check if already exists
  156. if _, exist := p.m.Load(manifest.Identity()); exist {
  157. return nil, errors.Join(fmt.Errorf("plugin already exists: %s", manifest.Identity()), err)
  158. }
  159. checksum, err := decoder.Checksum()
  160. if err != nil {
  161. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  162. }
  163. identity := manifest.Identity()
  164. // replace : with -
  165. identity = strings.ReplaceAll(identity, ":", "-")
  166. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  167. // check if working directory exists
  168. if _, err := os.Stat(plugin_working_path); err == nil {
  169. return nil, errors.Join(fmt.Errorf("plugin working directory already exists: %s", plugin_working_path), err)
  170. }
  171. // extract to working directory
  172. if err := decoder.ExtractTo(plugin_working_path); err != nil {
  173. return nil, errors.Join(fmt.Errorf("extract plugin to working directory error: %v", err), err)
  174. }
  175. return &pluginRuntimeWithDecoder{
  176. Runtime: plugin_entities.PluginRuntime{
  177. Config: manifest,
  178. State: plugin_entities.PluginRuntimeState{
  179. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  180. Restarts: 0,
  181. AbsolutePath: plugin_path,
  182. WorkingPath: plugin_working_path,
  183. ActiveAt: nil,
  184. Verified: manifest.Verified,
  185. },
  186. },
  187. Decoder: decoder,
  188. }, nil
  189. }