watcher.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  20. )
  21. func (p *PluginManager) startLocalWatcher(config *app.Config) {
  22. go func() {
  23. log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
  24. p.handleNewLocalPlugins(config)
  25. for range time.NewTicker(time.Second * 30).C {
  26. p.handleNewLocalPlugins(config)
  27. }
  28. }()
  29. }
  30. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  31. // launch TCP debugging server if enabled
  32. if config.PluginRemoteInstallingEnabled {
  33. server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
  34. go func() {
  35. err := server.Launch()
  36. if err != nil {
  37. log.Error("start remote plugin server failed: %s", err.Error())
  38. }
  39. }()
  40. go func() {
  41. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  42. p.fullDuplexLifetime(rpr)
  43. })
  44. }()
  45. }
  46. }
  47. func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
  48. // load local plugins firstly
  49. for plugin := range p.loadNewLocalPlugins(config.PluginStoragePath) {
  50. // get assets
  51. assets, err := plugin.Decoder.Assets()
  52. if err != nil {
  53. log.Error("get plugin assets error: %v", err)
  54. continue
  55. }
  56. local_plugin_runtime := &local_manager.LocalPluginRuntime{
  57. PluginRuntime: plugin.Runtime,
  58. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  59. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  60. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  61. WorkingPath: plugin.Runtime.State.WorkingPath,
  62. Decoder: plugin.Decoder,
  63. },
  64. }
  65. if err := local_plugin_runtime.RemapAssets(
  66. &local_plugin_runtime.Config,
  67. assets,
  68. ); err != nil {
  69. log.Error("remap plugin assets error: %v", err)
  70. continue
  71. }
  72. identity, err := local_plugin_runtime.Identity()
  73. if err != nil {
  74. log.Error("get plugin identity error: %v", err)
  75. continue
  76. }
  77. // store the plugin in the storage, avoid duplicate loading
  78. p.runningPluginInStorage.Store(plugin.Runtime.State.AbsolutePath, identity.String())
  79. // local plugin
  80. routine.Submit(func() {
  81. defer func() {
  82. if r := recover(); r != nil {
  83. log.Error("plugin runtime error: %v", r)
  84. }
  85. }()
  86. // delete the plugin from the storage when the plugin is stopped
  87. defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
  88. p.fullDuplexLifetime(local_plugin_runtime)
  89. })
  90. }
  91. }
  92. type pluginRuntimeWithDecoder struct {
  93. Runtime plugin_entities.PluginRuntime
  94. Decoder decoder.PluginDecoder
  95. }
  96. // chan should be closed after using that
  97. func (p *PluginManager) loadNewLocalPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
  98. ch := make(chan *pluginRuntimeWithDecoder)
  99. plugins, err := os.ReadDir(root_path)
  100. if err != nil {
  101. log.Error("no plugin found in path: %s", root_path)
  102. close(ch)
  103. return ch
  104. }
  105. routine.Submit(func() {
  106. for _, plugin := range plugins {
  107. if !plugin.IsDir() {
  108. abs_path := path.Join(root_path, plugin.Name())
  109. if _, ok := p.runningPluginInStorage.Load(abs_path); ok {
  110. // if the plugin is already running, skip it
  111. continue
  112. }
  113. plugin, err := p.loadPlugin(abs_path)
  114. if err != nil {
  115. log.Error("load plugin error: %v", err)
  116. continue
  117. }
  118. ch <- plugin
  119. }
  120. }
  121. close(ch)
  122. })
  123. return ch
  124. }
  125. func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  126. pack, err := os.Open(plugin_path)
  127. if err != nil {
  128. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  129. }
  130. defer pack.Close()
  131. if info, err := pack.Stat(); err != nil {
  132. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  133. } else if info.Size() > p.maxPluginPackageSize {
  134. log.Error("plugin package size is too large: %d", info.Size())
  135. return nil, err
  136. }
  137. plugin_zip, err := io.ReadAll(pack)
  138. if err != nil {
  139. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  140. }
  141. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  142. if err != nil {
  143. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  144. }
  145. // get manifest
  146. manifest, err := decoder.Manifest()
  147. if err != nil {
  148. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  149. }
  150. // check if already exists
  151. if _, exist := p.m.Load(manifest.Identity()); exist {
  152. return nil, errors.Join(fmt.Errorf("plugin already exists: %s", manifest.Identity()), err)
  153. }
  154. checksum, err := checksum.CalculateChecksum(decoder)
  155. if err != nil {
  156. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  157. }
  158. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", manifest.Identity(), checksum))
  159. // check if working directory exists
  160. if _, err := os.Stat(plugin_working_path); err == nil {
  161. return nil, errors.Join(fmt.Errorf("plugin working directory already exists: %s", plugin_working_path), err)
  162. }
  163. // copy to working directory
  164. if err := decoder.Walk(func(filename, dir string) error {
  165. working_path := path.Join(plugin_working_path, dir)
  166. // check if directory exists
  167. if err := os.MkdirAll(working_path, 0755); err != nil {
  168. return err
  169. }
  170. bytes, err := decoder.ReadFile(filename)
  171. if err != nil {
  172. return err
  173. }
  174. filename = path.Join(working_path, filename)
  175. // copy file
  176. if err := os.WriteFile(filename, bytes, 0644); err != nil {
  177. return err
  178. }
  179. return nil
  180. }); err != nil {
  181. return nil, errors.Join(fmt.Errorf("copy plugin to working directory error: %v", err), err)
  182. }
  183. return &pluginRuntimeWithDecoder{
  184. Runtime: plugin_entities.PluginRuntime{
  185. Config: manifest,
  186. State: plugin_entities.PluginRuntimeState{
  187. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  188. Restarts: 0,
  189. AbsolutePath: plugin_path,
  190. WorkingPath: plugin_working_path,
  191. ActiveAt: nil,
  192. Verified: verifier.VerifyPlugin(decoder) == nil,
  193. },
  194. },
  195. Decoder: decoder,
  196. }, nil
  197. }