serverless.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_runtime"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  11. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  12. )
  13. const (
  14. PLUGIN_SERVERLESS_CACHE_KEY = "serverless:runtime:%s"
  15. )
  16. func (p *PluginManager) getServerlessRuntimeCacheKey(
  17. identity plugin_entities.PluginUniqueIdentifier,
  18. ) string {
  19. return fmt.Sprintf(PLUGIN_SERVERLESS_CACHE_KEY, identity.String())
  20. }
  21. func (p *PluginManager) getServerlessPluginRuntime(
  22. identity plugin_entities.PluginUniqueIdentifier,
  23. ) (plugin_entities.PluginLifetime, error) {
  24. model, err := p.getServerlessPluginRuntimeModel(identity)
  25. if err != nil {
  26. return nil, err
  27. }
  28. // FIXME: get declaration
  29. declaration, err := helper.CombinedGetPluginDeclaration(identity, plugin_entities.PLUGIN_RUNTIME_TYPE_SERVERLESS)
  30. if err != nil {
  31. return nil, err
  32. }
  33. // init runtime entity
  34. runtimeEntity := plugin_entities.PluginRuntime{
  35. Config: *declaration,
  36. }
  37. runtimeEntity.InitState()
  38. // convert to plugin runtime
  39. pluginRuntime := serverless_runtime.AWSPluginRuntime{
  40. BasicChecksum: basic_runtime.BasicChecksum{
  41. MediaTransport: basic_runtime.NewMediaTransport(p.mediaBucket),
  42. InnerChecksum: model.Checksum,
  43. },
  44. PluginRuntime: runtimeEntity,
  45. LambdaURL: model.FunctionURL,
  46. LambdaName: model.FunctionName,
  47. }
  48. if err := pluginRuntime.InitEnvironment(); err != nil {
  49. return nil, err
  50. }
  51. return &pluginRuntime, nil
  52. }
  53. func (p *PluginManager) getServerlessPluginRuntimeModel(
  54. identity plugin_entities.PluginUniqueIdentifier,
  55. ) (*models.ServerlessRuntime, error) {
  56. // check if plugin is a serverless runtime
  57. runtime, err := cache.Get[models.ServerlessRuntime](
  58. p.getServerlessRuntimeCacheKey(identity),
  59. )
  60. if err != nil && err != cache.ErrNotFound {
  61. return nil, fmt.Errorf("unexpected error occurred during fetch serverless runtime cache: %v", err)
  62. }
  63. if err == cache.ErrNotFound {
  64. runtimeModel, err := db.GetOne[models.ServerlessRuntime](
  65. db.Equal("plugin_unique_identifier", identity.String()),
  66. )
  67. if err == db.ErrDatabaseNotFound {
  68. return nil, fmt.Errorf("plugin serverless runtime not found: %s", identity.String())
  69. }
  70. if err != nil {
  71. return nil, fmt.Errorf("failed to load serverless runtime from db: %v", err)
  72. }
  73. cache.Store(p.getServerlessRuntimeCacheKey(identity), runtimeModel, time.Minute*30)
  74. runtime = &runtimeModel
  75. } else if err != nil {
  76. return nil, fmt.Errorf("failed to load serverless runtime from cache: %v", err)
  77. }
  78. return runtime, nil
  79. }