watcher.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/fs"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  21. )
  22. func (p *PluginManager) startLocalWatcher() {
  23. go func() {
  24. // delete all plugins in working directory
  25. os.RemoveAll(p.workingDirectory)
  26. log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
  27. p.handleNewLocalPlugins()
  28. for range time.NewTicker(time.Second * 30).C {
  29. p.handleNewLocalPlugins()
  30. }
  31. }()
  32. }
  33. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  34. // launch TCP debugging server if enabled
  35. if config.PluginRemoteInstallingEnabled {
  36. server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
  37. go func() {
  38. err := server.Launch()
  39. if err != nil {
  40. log.Error("start remote plugin server failed: %s", err.Error())
  41. }
  42. }()
  43. go func() {
  44. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  45. routine.Submit(func() {
  46. defer func() {
  47. if err := recover(); err != nil {
  48. log.Error("plugin runtime error: %v", err)
  49. }
  50. }()
  51. p.fullDuplexLifetime(rpr, nil)
  52. })
  53. })
  54. }()
  55. }
  56. }
  57. func (p *PluginManager) handleNewLocalPlugins() {
  58. // walk through all plugins
  59. err := filepath.WalkDir(p.pluginStoragePath, func(path string, d fs.DirEntry, err error) error {
  60. if err != nil {
  61. return err
  62. }
  63. if !d.IsDir() {
  64. _, _, err := p.launchLocal(path)
  65. if err != nil {
  66. log.Error("launch local plugin failed: %s", err.Error())
  67. }
  68. }
  69. return nil
  70. })
  71. if err != nil {
  72. log.Error("walk through plugins failed: %s", err.Error())
  73. }
  74. }
  75. func (p *PluginManager) launchLocal(plugin_package_path string) (
  76. plugin_entities.PluginFullDuplexLifetime, <-chan error, error,
  77. ) {
  78. plugin, err := p.getLocalPluginRuntime(plugin_package_path)
  79. if err != nil {
  80. return nil, nil, err
  81. }
  82. identity, err := plugin.decoder.UniqueIdentity()
  83. if err != nil {
  84. return nil, nil, err
  85. }
  86. // lock launch process
  87. p.localPluginLaunchingLock.Lock(identity.String())
  88. defer p.localPluginLaunchingLock.Unlock(identity.String())
  89. // check if the plugin is already running
  90. if lifetime, ok := p.m.Load(identity.String()); ok {
  91. lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
  92. if !ok {
  93. return nil, nil, fmt.Errorf("plugin runtime not found")
  94. }
  95. // returns a closed channel to indicate the plugin is already running, no more waiting is needed
  96. c := make(chan error)
  97. close(c)
  98. return lifetime, c, nil
  99. }
  100. // extract plugin
  101. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  102. if !ok {
  103. return nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
  104. }
  105. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  106. return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  107. }
  108. success := false
  109. failed := func(message string) error {
  110. if !success {
  111. os.RemoveAll(plugin.runtime.State.WorkingPath)
  112. }
  113. return errors.New(message)
  114. }
  115. // get assets
  116. assets, err := plugin.decoder.Assets()
  117. if err != nil {
  118. return nil, nil, failed(err.Error())
  119. }
  120. local_plugin_runtime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
  121. local_plugin_runtime.PluginRuntime = plugin.runtime
  122. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  123. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  124. LocalPackagePath: plugin.runtime.State.AbsolutePath,
  125. WorkingPath: plugin.runtime.State.WorkingPath,
  126. Decoder: plugin.decoder,
  127. }
  128. if err := local_plugin_runtime.RemapAssets(
  129. &local_plugin_runtime.Config,
  130. assets,
  131. ); err != nil {
  132. return nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  133. }
  134. success = true
  135. p.m.Store(identity.String(), local_plugin_runtime)
  136. launched_chan := make(chan error)
  137. // local plugin
  138. routine.Submit(func() {
  139. defer func() {
  140. if r := recover(); r != nil {
  141. log.Error("plugin runtime panic: %v", r)
  142. }
  143. p.m.Delete(identity.String())
  144. }()
  145. p.fullDuplexLifetime(local_plugin_runtime, launched_chan)
  146. })
  147. return local_plugin_runtime, launched_chan, nil
  148. }
  149. type pluginRuntimeWithDecoder struct {
  150. runtime plugin_entities.PluginRuntime
  151. decoder decoder.PluginDecoder
  152. }
  153. // extract plugin from package to working directory
  154. func (p *PluginManager) getLocalPluginRuntime(plugin_path string) (
  155. *pluginRuntimeWithDecoder,
  156. error,
  157. ) {
  158. pack, err := os.Open(plugin_path)
  159. if err != nil {
  160. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  161. }
  162. defer pack.Close()
  163. if info, err := pack.Stat(); err != nil {
  164. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  165. } else if info.Size() > p.maxPluginPackageSize {
  166. log.Error("plugin package size is too large: %d", info.Size())
  167. return nil, errors.Join(err, fmt.Errorf("plugin package size is too large"))
  168. }
  169. plugin_zip, err := io.ReadAll(pack)
  170. if err != nil {
  171. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  172. }
  173. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  174. if err != nil {
  175. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  176. }
  177. // get manifest
  178. manifest, err := decoder.Manifest()
  179. if err != nil {
  180. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  181. }
  182. checksum, err := decoder.Checksum()
  183. if err != nil {
  184. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  185. }
  186. identity := manifest.Identity()
  187. identity = strings.ReplaceAll(identity, ":", "-")
  188. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  189. return &pluginRuntimeWithDecoder{
  190. runtime: plugin_entities.PluginRuntime{
  191. Config: manifest,
  192. State: plugin_entities.PluginRuntimeState{
  193. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  194. Restarts: 0,
  195. AbsolutePath: plugin_path,
  196. WorkingPath: plugin_working_path,
  197. ActiveAt: nil,
  198. Verified: manifest.Verified,
  199. },
  200. },
  201. decoder: decoder,
  202. }, nil
  203. }