watcher.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  19. )
  20. func (p *PluginManager) startLocalWatcher() {
  21. go func() {
  22. log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
  23. p.handleNewLocalPlugins()
  24. for range time.NewTicker(time.Second * 30).C {
  25. p.handleNewLocalPlugins()
  26. }
  27. }()
  28. }
  29. func (p *PluginManager) startRemoteWatcher(config *app.Config) {
  30. // launch TCP debugging server if enabled
  31. if config.PluginRemoteInstallingEnabled {
  32. server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
  33. go func() {
  34. err := server.Launch()
  35. if err != nil {
  36. log.Error("start remote plugin server failed: %s", err.Error())
  37. }
  38. }()
  39. go func() {
  40. server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
  41. routine.Submit(func() {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. log.Error("plugin runtime error: %v", err)
  45. }
  46. }()
  47. p.fullDuplexLifetime(rpr)
  48. })
  49. })
  50. }()
  51. }
  52. }
  53. func (p *PluginManager) handleNewLocalPlugins() {
  54. // load local plugins firstly
  55. plugins, err := os.ReadDir(p.pluginStoragePath)
  56. if err != nil {
  57. log.Error("no plugin found in path: %s", p.pluginStoragePath)
  58. }
  59. for _, plugin := range plugins {
  60. if !plugin.IsDir() {
  61. abs_path := path.Join(p.pluginStoragePath, plugin.Name())
  62. _, err := p.launchLocal(abs_path)
  63. if err != nil {
  64. log.Error("launch local plugin failed: %s", err.Error())
  65. }
  66. }
  67. }
  68. }
  69. func (p *PluginManager) launchLocal(plugin_package_path string) (plugin_entities.PluginFullDuplexLifetime, error) {
  70. plugin, err := p.getLocalPluginRuntime(plugin_package_path)
  71. if err != nil {
  72. return nil, err
  73. }
  74. identity, err := plugin.decoder.UniqueIdentity()
  75. if err != nil {
  76. return nil, err
  77. }
  78. // lock launch process
  79. p.localPluginLaunchingLock.Lock(identity.String())
  80. defer p.localPluginLaunchingLock.Unlock(identity.String())
  81. // check if the plugin is already running
  82. if _, ok := p.m.Load(identity.String()); ok {
  83. lifetime, ok := p.Get(identity).(plugin_entities.PluginFullDuplexLifetime)
  84. if !ok {
  85. return nil, fmt.Errorf("plugin runtime not found")
  86. }
  87. return lifetime, nil
  88. }
  89. // extract plugin
  90. decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
  91. if !ok {
  92. return nil, fmt.Errorf("plugin decoder is not a zip decoder")
  93. }
  94. if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
  95. return nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
  96. }
  97. success := false
  98. failed := func(message string) error {
  99. if !success {
  100. os.RemoveAll(plugin.runtime.State.WorkingPath)
  101. }
  102. return errors.New(message)
  103. }
  104. // get assets
  105. assets, err := plugin.decoder.Assets()
  106. if err != nil {
  107. return nil, failed(err.Error())
  108. }
  109. local_plugin_runtime := local_manager.NewLocalPluginRuntime()
  110. local_plugin_runtime.PluginRuntime = plugin.runtime
  111. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  112. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  113. LocalPackagePath: plugin.runtime.State.AbsolutePath,
  114. WorkingPath: plugin.runtime.State.WorkingPath,
  115. Decoder: plugin.decoder,
  116. }
  117. if err := local_plugin_runtime.RemapAssets(
  118. &local_plugin_runtime.Config,
  119. assets,
  120. ); err != nil {
  121. return nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
  122. }
  123. success = true
  124. // local plugin
  125. routine.Submit(func() {
  126. defer func() {
  127. if r := recover(); r != nil {
  128. log.Error("plugin runtime panic: %v", r)
  129. }
  130. }()
  131. p.fullDuplexLifetime(local_plugin_runtime)
  132. })
  133. return local_plugin_runtime, nil
  134. }
  135. type pluginRuntimeWithDecoder struct {
  136. runtime plugin_entities.PluginRuntime
  137. decoder decoder.PluginDecoder
  138. }
  139. // extract plugin from package to working directory
  140. func (p *PluginManager) getLocalPluginRuntime(plugin_path string) (*pluginRuntimeWithDecoder, error) {
  141. pack, err := os.Open(plugin_path)
  142. if err != nil {
  143. return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
  144. }
  145. defer pack.Close()
  146. if info, err := pack.Stat(); err != nil {
  147. return nil, errors.Join(err, fmt.Errorf("get plugin package info error"))
  148. } else if info.Size() > p.maxPluginPackageSize {
  149. log.Error("plugin package size is too large: %d", info.Size())
  150. return nil, err
  151. }
  152. plugin_zip, err := io.ReadAll(pack)
  153. if err != nil {
  154. return nil, errors.Join(err, fmt.Errorf("read plugin package error"))
  155. }
  156. decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
  157. if err != nil {
  158. return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
  159. }
  160. // get manifest
  161. manifest, err := decoder.Manifest()
  162. if err != nil {
  163. return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
  164. }
  165. // check if already exists
  166. if _, exist := p.m.Load(manifest.Identity()); exist {
  167. return nil, errors.Join(fmt.Errorf("plugin already exists: %s", manifest.Identity()), err)
  168. }
  169. checksum, err := decoder.Checksum()
  170. if err != nil {
  171. return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
  172. }
  173. identity := manifest.Identity()
  174. // replace : with -
  175. identity = strings.ReplaceAll(identity, ":", "-")
  176. plugin_working_path := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
  177. // check if working directory exists
  178. if _, err := os.Stat(plugin_working_path); err == nil {
  179. return nil, errors.Join(fmt.Errorf("plugin working directory already exists: %s", plugin_working_path), err)
  180. }
  181. return &pluginRuntimeWithDecoder{
  182. runtime: plugin_entities.PluginRuntime{
  183. Config: manifest,
  184. State: plugin_entities.PluginRuntimeState{
  185. Status: plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
  186. Restarts: 0,
  187. AbsolutePath: plugin_path,
  188. WorkingPath: plugin_working_path,
  189. ActiveAt: nil,
  190. Verified: manifest.Verified,
  191. },
  192. },
  193. decoder: decoder,
  194. }, nil
  195. }