serverless.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/db"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  12. )
  13. const (
  14. PLUGIN_SERVERLESS_CACHE_KEY = "serverless:runtime:%s"
  15. )
  16. func (p *PluginManager) getServerlessRuntimeCacheKey(
  17. identity plugin_entities.PluginUniqueIdentifier,
  18. ) string {
  19. return fmt.Sprintf(PLUGIN_SERVERLESS_CACHE_KEY, identity.String())
  20. }
  21. func (p *PluginManager) getServerlessPluginRuntime(
  22. identity plugin_entities.PluginUniqueIdentifier,
  23. ) (plugin_entities.PluginLifetime, error) {
  24. model, err := p.getServerlessPluginRuntimeModel(identity)
  25. if err != nil {
  26. return nil, err
  27. }
  28. declaration := model.Declaration
  29. // init runtime entity
  30. runtime_entity := plugin_entities.PluginRuntime{
  31. Config: declaration,
  32. }
  33. runtime_entity.InitState()
  34. // convert to plugin runtime
  35. plugin_runtime := aws_manager.AWSPluginRuntime{
  36. PositivePluginRuntime: positive_manager.PositivePluginRuntime{
  37. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  38. InnerChecksum: model.Checksum,
  39. },
  40. PluginRuntime: runtime_entity,
  41. LambdaURL: model.FunctionURL,
  42. LambdaName: model.FunctionName,
  43. }
  44. if err := plugin_runtime.InitEnvironment(); err != nil {
  45. return nil, err
  46. }
  47. return &plugin_runtime, nil
  48. }
  49. func (p *PluginManager) getServerlessPluginRuntimeModel(
  50. identity plugin_entities.PluginUniqueIdentifier,
  51. ) (*models.ServerlessRuntime, error) {
  52. // check if plugin is a serverless runtime
  53. runtime, err := cache.Get[models.ServerlessRuntime](
  54. p.getServerlessRuntimeCacheKey(identity),
  55. )
  56. if err != nil && err != cache.ErrNotFound {
  57. return nil, fmt.Errorf("unexpected error occurred during fetch serverless runtime cache: %v", err)
  58. }
  59. if err == cache.ErrNotFound {
  60. runtime_model, err := db.GetOne[models.ServerlessRuntime](
  61. db.Equal("plugin_unique_identifier", identity.String()),
  62. )
  63. if err == db.ErrDatabaseNotFound {
  64. return nil, fmt.Errorf("plugin not found: %s", identity.String())
  65. }
  66. if err != nil {
  67. return nil, fmt.Errorf("failed to load serverless runtime from db: %v", err)
  68. }
  69. cache.Store(p.getServerlessRuntimeCacheKey(identity), runtime_model, time.Minute*30)
  70. runtime = &runtime_model
  71. } else if err != nil {
  72. return nil, fmt.Errorf("failed to load serverless runtime from cache: %v", err)
  73. }
  74. return runtime, nil
  75. }