manager.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation/real"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  13. "github.com/langgenius/dify-plugin-daemon/internal/db"
  14. "github.com/langgenius/dify-plugin-daemon/internal/oss"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/lock"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  23. )
  24. type PluginManager struct {
  25. m mapping.Map[string, plugin_entities.PluginLifetime]
  26. // max size of a plugin package
  27. maxPluginPackageSize int64
  28. // where the plugin finally running
  29. workingDirectory string
  30. // where the plugin finally installed but not running
  31. pluginStoragePath string
  32. // mediaBucket is used to manage media files like plugin icons, images, etc.
  33. mediaBucket *media_manager.MediaBucket
  34. // packageBucket is used to manage plugin packages, all the packages uploaded by users will be saved here
  35. packageBucket *media_manager.PackageBucket
  36. // installedBucket is used to manage installed plugins, all the installed plugins will be saved here
  37. installedBucket *media_manager.InstalledBucket
  38. // register plugin
  39. pluginRegisters []func(lifetime plugin_entities.PluginLifetime) error
  40. // localPluginLaunchingLock is a lock to launch local plugins
  41. localPluginLaunchingLock *lock.GranularityLock
  42. // backwardsInvocation is a handle to invoke dify
  43. backwardsInvocation dify_invocation.BackwardsInvocation
  44. // python interpreter path
  45. pythonInterpreterPath string
  46. // remote plugin server
  47. remotePluginServer remote_manager.RemotePluginServerInterface
  48. // max launching lock to prevent too many plugins launching at the same time
  49. maxLaunchingLock chan bool
  50. }
  51. var (
  52. manager *PluginManager
  53. )
  54. func InitGlobalManager(oss oss.OSS, configuration *app.Config) *PluginManager {
  55. manager = &PluginManager{
  56. maxPluginPackageSize: configuration.MaxPluginPackageSize,
  57. pluginStoragePath: configuration.PluginInstalledPath,
  58. workingDirectory: configuration.PluginWorkingPath,
  59. mediaBucket: media_manager.NewAssetsBucket(
  60. oss,
  61. configuration.PluginMediaCachePath,
  62. configuration.PluginMediaCacheSize,
  63. ),
  64. packageBucket: media_manager.NewPackageBucket(
  65. oss,
  66. configuration.PluginPackageCachePath,
  67. ),
  68. installedBucket: media_manager.NewInstalledBucket(
  69. oss,
  70. configuration.PluginInstalledPath,
  71. ),
  72. localPluginLaunchingLock: lock.NewGranularityLock(),
  73. maxLaunchingLock: make(chan bool, 2), // by default, we allow 2 plugins launching at the same time
  74. pythonInterpreterPath: configuration.PythonInterpreterPath,
  75. }
  76. // mkdir
  77. os.MkdirAll(configuration.PluginWorkingPath, 0755)
  78. os.MkdirAll(configuration.PluginInstalledPath, 0755)
  79. os.MkdirAll(configuration.PluginMediaCachePath, 0755)
  80. os.MkdirAll(configuration.PluginPackageCachePath, 0755)
  81. os.MkdirAll(filepath.Dir(configuration.ProcessCachingPath), 0755)
  82. return manager
  83. }
  84. func Manager() *PluginManager {
  85. return manager
  86. }
  87. func (p *PluginManager) Get(
  88. identity plugin_entities.PluginUniqueIdentifier,
  89. ) (plugin_entities.PluginLifetime, error) {
  90. if v, ok := p.m.Load(identity.String()); ok {
  91. return v, nil
  92. }
  93. // check if plugin is a serverless runtime
  94. plugin_session_interface, err := p.getServerlessPluginRuntime(identity)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return plugin_session_interface, nil
  99. }
  100. func (p *PluginManager) GetAsset(id string) ([]byte, error) {
  101. return p.mediaBucket.Get(id)
  102. }
  103. func (p *PluginManager) Launch(configuration *app.Config) {
  104. log.Info("start plugin manager daemon...")
  105. // init redis client
  106. if err := cache.InitRedisClient(
  107. fmt.Sprintf("%s:%d", configuration.RedisHost, configuration.RedisPort),
  108. configuration.RedisPass,
  109. ); err != nil {
  110. log.Panic("init redis client failed: %s", err.Error())
  111. }
  112. invocation, err := real.NewDifyInvocationDaemon(
  113. configuration.DifyInnerApiURL, configuration.DifyInnerApiKey,
  114. )
  115. if err != nil {
  116. log.Panic("init dify invocation daemon failed: %s", err.Error())
  117. }
  118. p.backwardsInvocation = invocation
  119. // start local watcher
  120. if configuration.Platform == app.PLATFORM_LOCAL {
  121. p.startLocalWatcher()
  122. }
  123. // launch serverless connector
  124. if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
  125. serverless.Init(configuration)
  126. }
  127. // start remote watcher
  128. p.startRemoteWatcher(configuration)
  129. }
  130. func (p *PluginManager) BackwardsInvocation() dify_invocation.BackwardsInvocation {
  131. return p.backwardsInvocation
  132. }
  133. func (p *PluginManager) SavePackage(plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, pkg []byte) (
  134. *plugin_entities.PluginDeclaration, error,
  135. ) {
  136. // save to storage
  137. err := p.packageBucket.Save(plugin_unique_identifier.String(), pkg)
  138. if err != nil {
  139. return nil, err
  140. }
  141. // try to decode the package
  142. package_decoder, err := decoder.NewZipPluginDecoder(pkg)
  143. if err != nil {
  144. return nil, err
  145. }
  146. // get the declaration
  147. declaration, err := package_decoder.Manifest()
  148. if err != nil {
  149. return nil, err
  150. }
  151. // get the assets
  152. assets, err := package_decoder.Assets()
  153. if err != nil {
  154. return nil, err
  155. }
  156. // remap the assets
  157. _, err = p.mediaBucket.RemapAssets(&declaration, assets)
  158. if err != nil {
  159. return nil, err
  160. }
  161. unique_identifier, err := package_decoder.UniqueIdentity()
  162. if err != nil {
  163. return nil, err
  164. }
  165. // create plugin if not exists
  166. if _, err := db.GetOne[models.PluginDeclaration](
  167. db.Equal("plugin_unique_identifier", unique_identifier.String()),
  168. ); err == db.ErrDatabaseNotFound {
  169. err = db.Create(&models.PluginDeclaration{
  170. PluginUniqueIdentifier: unique_identifier.String(),
  171. PluginID: unique_identifier.PluginID(),
  172. Declaration: declaration,
  173. })
  174. if err != nil {
  175. return nil, err
  176. }
  177. } else if err != nil {
  178. return nil, err
  179. }
  180. return &declaration, nil
  181. }
  182. func (p *PluginManager) GetPackage(
  183. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  184. ) ([]byte, error) {
  185. file, err := p.packageBucket.Get(plugin_unique_identifier.String())
  186. if err != nil {
  187. if os.IsNotExist(err) {
  188. return nil, errors.New("plugin package not found, please upload it firstly")
  189. }
  190. return nil, err
  191. }
  192. return file, nil
  193. }
  194. func (p *PluginManager) GetDeclaration(
  195. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  196. ) (
  197. *plugin_entities.PluginDeclaration, error,
  198. ) {
  199. return helper.CombinedGetPluginDeclaration(plugin_unique_identifier)
  200. }