watcher.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  21. )
  22. func (p *PluginManager) startLocalWatcher(config *app.Config) {
  23. go func() {
  24. log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
  25. p.handleNewPlugins(config)
  26. for range time.NewTicker(time.Second * 30).C {
  27. p.handleNewPlugins(config)
  28. }
  29. }()
  30. }
  31. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  32. // launch TCP debugging server if enabled
  33. if config.PluginRemoteInstallingEnabled {
  34. server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
  35. go func() {
  36. err := server.Launch()
  37. if err != nil {
  38. log.Error("start remote plugin server failed: %s", err.Error())
  39. }
  40. }()
  41. go func() {
  42. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  43. p.lifetime(rpr)
  44. })
  45. }()
  46. }
  47. }
  48. func (p *PluginManager) handleNewPlugins(config *app.Config) {
  49. // load local plugins firstly
  50. for plugin := range p.loadNewPlugins(config.PluginStoragePath) {
  51. var plugin_interface plugin_entities.PluginRuntimeInterface
  52. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  53. plugin_interface = &aws_manager.AWSPluginRuntime{
  54. PluginRuntime: plugin.Runtime,
  55. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  56. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  57. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  58. WorkingPath: plugin.Runtime.State.WorkingPath,
  59. Decoder: plugin.Decoder,
  60. },
  61. }
  62. } else if config.Platform == app.PLATFORM_LOCAL {
  63. plugin_interface = &local_manager.LocalPluginRuntime{
  64. PluginRuntime: plugin.Runtime,
  65. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  66. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  67. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  68. WorkingPath: plugin.Runtime.State.WorkingPath,
  69. Decoder: plugin.Decoder,
  70. },
  71. }
  72. } else {
  73. log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Runtime.Config.Name)
  74. continue
  75. }
  76. identity, err := plugin_interface.Identity()
  77. if err != nil {
  78. log.Error("get plugin identity error: %v", err)
  79. continue
  80. }
  81. // store the plugin in the storage, avoid duplicate loading
  82. p.runningPluginInStorage.Store(plugin.Runtime.State.AbsolutePath, identity.String())
  83. routine.Submit(func() {
  84. defer func() {
  85. if r := recover(); r != nil {
  86. log.Error("plugin runtime error: %v", r)
  87. }
  88. }()
  89. // delete the plugin from the storage when the plugin is stopped
  90. defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
  91. p.lifetime(plugin_interface)
  92. })
  93. }
  94. }
  95. type pluginRuntimeWithDecoder struct {
  96. Runtime plugin_entities.PluginRuntime
  97. Decoder decoder.PluginDecoder
  98. }
  99. // chan should be closed after using that
  100. func (p *PluginManager) loadNewPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
  101. ch := make(chan *pluginRuntimeWithDecoder)
  102. plugins, err := os.ReadDir(root_path)
  103. if err != nil {
  104. log.Error("no plugin found in path: %s", root_path)
  105. close(ch)
  106. return ch
  107. }
  108. routine.Submit(func() {
  109. for _, plugin := range plugins {
  110. if !plugin.IsDir() {
  111. abs_path := path.Join(root_path, plugin.Name())
  112. if _, ok := p.runningPluginInStorage.Load(abs_path); ok {
  113. // if the plugin is already running, skip it
  114. continue
  115. }
  116. plugin, err := p.loadPlugin(abs_path)
  117. if err != nil {
  118. log.Error("load plugin error: %v", err)
  119. continue
  120. }
  121. ch <- plugin
  122. }
  123. }
  124. close(ch)
  125. })
  126. return ch
  127. }
  128. func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  129. pack, err := os.Open(plugin_path)
  130. if err != nil {
  131. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  132. }
  133. defer pack.Close()
  134. if info, err := pack.Stat(); err != nil {
  135. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  136. } else if info.Size() > p.maxPluginPackageSize {
  137. log.Error("plugin package size is too large: %d", info.Size())
  138. return nil, err
  139. }
  140. plugin_zip, err := io.ReadAll(pack)
  141. if err != nil {
  142. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  143. }
  144. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  145. if err != nil {
  146. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  147. }
  148. // get manifest
  149. manifest, err := decoder.Manifest()
  150. if err != nil {
  151. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  152. }
  153. // check if already exists
  154. if _, exist := p.m.Load(manifest.Identity()); exist {
  155. return nil, errors.Join(fmt.Errorf("plugin already exists: %s", manifest.Identity()), err)
  156. }
  157. checksum, err := checksum.CalculateChecksum(decoder)
  158. if err != nil {
  159. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  160. }
  161. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", manifest.Identity(), checksum))
  162. // check if working directory exists
  163. if _, err := os.Stat(plugin_working_path); err == nil {
  164. return nil, errors.Join(fmt.Errorf("plugin working directory already exists: %s", plugin_working_path), err)
  165. }
  166. // copy to working directory
  167. if err := decoder.Walk(func(filename, dir string) error {
  168. working_path := path.Join(plugin_working_path, dir)
  169. // check if directory exists
  170. if err := os.MkdirAll(working_path, 0755); err != nil {
  171. return err
  172. }
  173. bytes, err := decoder.ReadFile(filename)
  174. if err != nil {
  175. return err
  176. }
  177. filename = path.Join(working_path, filename)
  178. // copy file
  179. if err := os.WriteFile(filename, bytes, 0644); err != nil {
  180. return err
  181. }
  182. return nil
  183. }); err != nil {
  184. return nil, errors.Join(fmt.Errorf("copy plugin to working directory error: %v", err), err)
  185. }
  186. return &pluginRuntimeWithDecoder{
  187. Runtime: plugin_entities.PluginRuntime{
  188. Config: manifest,
  189. State: plugin_entities.PluginRuntimeState{
  190. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  191. Restarts: 0,
  192. AbsolutePath: plugin_path,
  193. WorkingPath: plugin_working_path,
  194. ActiveAt: nil,
  195. Verified: verifier.VerifyPlugin(decoder) == nil,
  196. },
  197. },
  198. Decoder: decoder,
  199. }, nil
  200. }