watcher.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "path"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  19. )
  20. func (p *PluginManager) startLocalWatcher(config *app.Config) {
  21. go func() {
  22. log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
  23. p.handleNewPlugins(config)
  24. for range time.NewTicker(time.Second * 30).C {
  25. p.handleNewPlugins(config)
  26. }
  27. }()
  28. }
  29. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  30. // launch TCP debugging server if enabled
  31. if config.PluginRemoteInstallingEnabled {
  32. server := remote_manager.NewRemotePluginServer(config)
  33. go func() {
  34. err := server.Launch()
  35. if err != nil {
  36. log.Error("start remote plugin server failed: %s", err.Error())
  37. }
  38. }()
  39. go func() {
  40. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  41. p.lifetime(config, rpr)
  42. })
  43. }()
  44. }
  45. }
  46. func (p *PluginManager) handleNewPlugins(config *app.Config) {
  47. // load local plugins firstly
  48. for plugin := range p.loadNewPlugins(config.PluginStoragePath) {
  49. var plugin_interface entities.PluginRuntimeInterface
  50. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  51. plugin_interface = &aws_manager.AWSPluginRuntime{
  52. PluginRuntime: plugin.Runtime,
  53. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  54. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  55. WorkingPath: plugin.Runtime.State.WorkingPath,
  56. Decoder: plugin.Decoder,
  57. },
  58. }
  59. } else if config.Platform == app.PLATFORM_LOCAL {
  60. plugin_interface = &local_manager.LocalPluginRuntime{
  61. PluginRuntime: plugin.Runtime,
  62. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  63. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  64. WorkingPath: plugin.Runtime.State.WorkingPath,
  65. Decoder: plugin.Decoder,
  66. },
  67. }
  68. } else {
  69. log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Runtime.Config.Name)
  70. continue
  71. }
  72. routine.Submit(func() {
  73. p.lifetime(config, plugin_interface)
  74. })
  75. }
  76. }
  77. type pluginRuntimeWithDecoder struct {
  78. Runtime entities.PluginRuntime
  79. Decoder decoder.PluginDecoder
  80. }
  81. // chan should be closed after using that
  82. func (p *PluginManager) loadNewPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
  83. ch := make(chan *pluginRuntimeWithDecoder)
  84. plugins, err := os.ReadDir(root_path)
  85. if err != nil {
  86. log.Error("no plugin found in path: %s", root_path)
  87. close(ch)
  88. return ch
  89. }
  90. routine.Submit(func() {
  91. for _, plugin := range plugins {
  92. if !plugin.IsDir() {
  93. plugin, err := p.loadPlugin(path.Join(root_path, plugin.Name()))
  94. if err != nil {
  95. log.Error("load plugin error: %v", err)
  96. continue
  97. }
  98. ch <- plugin
  99. }
  100. }
  101. close(ch)
  102. })
  103. return ch
  104. }
  105. func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  106. pack, err := os.Open(plugin_path)
  107. if err != nil {
  108. log.Error("open plugin package error: %v", err)
  109. return nil, err
  110. }
  111. defer pack.Close()
  112. if info, err := pack.Stat(); err != nil {
  113. log.Error("get plugin package info error: %v", err)
  114. return nil, err
  115. } else if info.Size() > p.maxPluginPackageSize {
  116. log.Error("plugin package size is too large: %d", info.Size())
  117. return nil, err
  118. }
  119. plugin_zip, err := io.ReadAll(pack)
  120. if err != nil {
  121. log.Error("read plugin package error: %v", err)
  122. return nil, err
  123. }
  124. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  125. if err != nil {
  126. log.Error("create plugin decoder error: %v", err)
  127. return nil, err
  128. }
  129. // get manifest
  130. manifest, err := decoder.Manifest()
  131. if err != nil {
  132. log.Error("get plugin manifest error: %v", err)
  133. return nil, err
  134. }
  135. // check if already exists
  136. if _, exist := p.m.Load(manifest.Identity()); exist {
  137. log.Warn("plugin already exists: %s", manifest.Identity())
  138. return nil, fmt.Errorf("plugin already exists: %s", manifest.Identity())
  139. }
  140. plugin_working_path := path.Join(p.workingDirectory, manifest.Identity())
  141. // check if working directory exists
  142. if _, err := os.Stat(plugin_working_path); err == nil {
  143. log.Warn("plugin working directory already exists: %s", plugin_working_path)
  144. return nil, fmt.Errorf("plugin working directory already exists: %s", plugin_working_path)
  145. }
  146. // copy to working directory
  147. if err := decoder.Walk(func(filename, dir string) error {
  148. working_path := path.Join(plugin_working_path, dir)
  149. // check if directory exists
  150. if err := os.MkdirAll(working_path, 0755); err != nil {
  151. return err
  152. }
  153. bytes, err := decoder.ReadFile(filename)
  154. if err != nil {
  155. return err
  156. }
  157. filename = path.Join(working_path, filename)
  158. // copy file
  159. if err := os.WriteFile(filename, bytes, 0644); err != nil {
  160. return err
  161. }
  162. return nil
  163. }); err != nil {
  164. log.Error("copy plugin to working directory error: %v", err)
  165. return nil, err
  166. }
  167. return &pluginRuntimeWithDecoder{
  168. Runtime: entities.PluginRuntime{
  169. Config: manifest,
  170. State: entities.PluginRuntimeState{
  171. Status: entities.PLUGIN_RUNTIME_STATUS_PENDING,
  172. Restarts: 0,
  173. AbsolutePath: plugin_path,
  174. WorkingPath: plugin_working_path,
  175. ActiveAt: nil,
  176. Verified: verifier.VerifyPlugin(decoder) == nil,
  177. },
  178. },
  179. Decoder: decoder,
  180. }, nil
  181. }
  182. func parsePluginConfig(configuration_path string) (*plugin_entities.PluginDeclaration, error) {
  183. text, err := os.ReadFile(configuration_path)
  184. if err != nil {
  185. return nil, err
  186. }
  187. result, err := plugin_entities.UnmarshalPluginDeclarationFromYaml(text)
  188. if err != nil {
  189. return nil, err
  190. }
  191. return result, nil
  192. }