watcher.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "path"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  18. )
  19. func (p *PluginManager) startLocalWatcher(config *app.Config) {
  20. go func() {
  21. log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
  22. p.handleNewPlugins(config)
  23. for range time.NewTicker(time.Second * 30).C {
  24. p.handleNewPlugins(config)
  25. }
  26. }()
  27. }
  28. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  29. // launch TCP debugging server if enabled
  30. if config.PluginRemoteInstallingEnabled {
  31. server := remote_manager.NewRemotePluginServer(config)
  32. go func() {
  33. err := server.Launch()
  34. if err != nil {
  35. log.Error("start remote plugin server failed: %s", err.Error())
  36. }
  37. }()
  38. go func() {
  39. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  40. p.lifetime(rpr)
  41. })
  42. }()
  43. }
  44. }
  45. func (p *PluginManager) handleNewPlugins(config *app.Config) {
  46. // load local plugins firstly
  47. for plugin := range p.loadNewPlugins(config.PluginStoragePath) {
  48. var plugin_interface plugin_entities.PluginRuntimeInterface
  49. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  50. plugin_interface = &aws_manager.AWSPluginRuntime{
  51. PluginRuntime: plugin.Runtime,
  52. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  53. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  54. WorkingPath: plugin.Runtime.State.WorkingPath,
  55. Decoder: plugin.Decoder,
  56. },
  57. }
  58. } else if config.Platform == app.PLATFORM_LOCAL {
  59. plugin_interface = &local_manager.LocalPluginRuntime{
  60. PluginRuntime: plugin.Runtime,
  61. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  62. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  63. WorkingPath: plugin.Runtime.State.WorkingPath,
  64. Decoder: plugin.Decoder,
  65. },
  66. }
  67. } else {
  68. log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Runtime.Config.Name)
  69. continue
  70. }
  71. routine.Submit(func() {
  72. p.lifetime(plugin_interface)
  73. })
  74. }
  75. }
  76. type pluginRuntimeWithDecoder struct {
  77. Runtime plugin_entities.PluginRuntime
  78. Decoder decoder.PluginDecoder
  79. }
  80. // chan should be closed after using that
  81. func (p *PluginManager) loadNewPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
  82. ch := make(chan *pluginRuntimeWithDecoder)
  83. plugins, err := os.ReadDir(root_path)
  84. if err != nil {
  85. log.Error("no plugin found in path: %s", root_path)
  86. close(ch)
  87. return ch
  88. }
  89. routine.Submit(func() {
  90. for _, plugin := range plugins {
  91. if !plugin.IsDir() {
  92. plugin, err := p.loadPlugin(path.Join(root_path, plugin.Name()))
  93. if err != nil {
  94. log.Error("load plugin error: %v", err)
  95. continue
  96. }
  97. ch <- plugin
  98. }
  99. }
  100. close(ch)
  101. })
  102. return ch
  103. }
  104. func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  105. pack, err := os.Open(plugin_path)
  106. if err != nil {
  107. log.Error("open plugin package error: %v", err)
  108. return nil, err
  109. }
  110. defer pack.Close()
  111. if info, err := pack.Stat(); err != nil {
  112. log.Error("get plugin package info error: %v", err)
  113. return nil, err
  114. } else if info.Size() > p.maxPluginPackageSize {
  115. log.Error("plugin package size is too large: %d", info.Size())
  116. return nil, err
  117. }
  118. plugin_zip, err := io.ReadAll(pack)
  119. if err != nil {
  120. log.Error("read plugin package error: %v", err)
  121. return nil, err
  122. }
  123. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  124. if err != nil {
  125. log.Error("create plugin decoder error: %v", err)
  126. return nil, err
  127. }
  128. // get manifest
  129. manifest, err := decoder.Manifest()
  130. if err != nil {
  131. log.Error("get plugin manifest error: %v", err)
  132. return nil, err
  133. }
  134. // check if already exists
  135. if _, exist := p.m.Load(manifest.Identity()); exist {
  136. log.Warn("plugin already exists: %s", manifest.Identity())
  137. return nil, fmt.Errorf("plugin already exists: %s", manifest.Identity())
  138. }
  139. plugin_working_path := path.Join(p.workingDirectory, manifest.Identity())
  140. // check if working directory exists
  141. if _, err := os.Stat(plugin_working_path); err == nil {
  142. log.Warn("plugin working directory already exists: %s", plugin_working_path)
  143. return nil, fmt.Errorf("plugin working directory already exists: %s", plugin_working_path)
  144. }
  145. // copy to working directory
  146. if err := decoder.Walk(func(filename, dir string) error {
  147. working_path := path.Join(plugin_working_path, dir)
  148. // check if directory exists
  149. if err := os.MkdirAll(working_path, 0755); err != nil {
  150. return err
  151. }
  152. bytes, err := decoder.ReadFile(filename)
  153. if err != nil {
  154. return err
  155. }
  156. filename = path.Join(working_path, filename)
  157. // copy file
  158. if err := os.WriteFile(filename, bytes, 0644); err != nil {
  159. return err
  160. }
  161. return nil
  162. }); err != nil {
  163. log.Error("copy plugin to working directory error: %v", err)
  164. return nil, err
  165. }
  166. return &pluginRuntimeWithDecoder{
  167. Runtime: plugin_entities.PluginRuntime{
  168. Config: manifest,
  169. State: plugin_entities.PluginRuntimeState{
  170. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  171. Restarts: 0,
  172. AbsolutePath: plugin_path,
  173. WorkingPath: plugin_working_path,
  174. ActiveAt: nil,
  175. Verified: verifier.VerifyPlugin(decoder) == nil,
  176. },
  177. },
  178. Decoder: decoder,
  179. }, nil
  180. }