serverless.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_runtime"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  11. )
  12. const (
  13. PLUGIN_SERVERLESS_CACHE_KEY = "serverless:runtime:%s"
  14. )
  15. func (p *PluginManager) getServerlessRuntimeCacheKey(
  16. identity plugin_entities.PluginUniqueIdentifier,
  17. ) string {
  18. return fmt.Sprintf(PLUGIN_SERVERLESS_CACHE_KEY, identity.String())
  19. }
  20. func (p *PluginManager) getServerlessPluginRuntime(
  21. identity plugin_entities.PluginUniqueIdentifier,
  22. ) (plugin_entities.PluginLifetime, error) {
  23. model, err := p.getServerlessPluginRuntimeModel(identity)
  24. if err != nil {
  25. return nil, err
  26. }
  27. declaration := model.Declaration
  28. // init runtime entity
  29. runtimeEntity := plugin_entities.PluginRuntime{
  30. Config: declaration,
  31. }
  32. runtimeEntity.InitState()
  33. // convert to plugin runtime
  34. pluginRuntime := serverless_runtime.AWSPluginRuntime{
  35. BasicChecksum: basic_runtime.BasicChecksum{
  36. MediaTransport: basic_runtime.NewMediaTransport(p.mediaBucket),
  37. InnerChecksum: model.Checksum,
  38. },
  39. PluginRuntime: runtimeEntity,
  40. LambdaURL: model.FunctionURL,
  41. LambdaName: model.FunctionName,
  42. }
  43. if err := pluginRuntime.InitEnvironment(); err != nil {
  44. return nil, err
  45. }
  46. return &pluginRuntime, nil
  47. }
  48. func (p *PluginManager) getServerlessPluginRuntimeModel(
  49. identity plugin_entities.PluginUniqueIdentifier,
  50. ) (*models.ServerlessRuntime, error) {
  51. // check if plugin is a serverless runtime
  52. runtime, err := cache.Get[models.ServerlessRuntime](
  53. p.getServerlessRuntimeCacheKey(identity),
  54. )
  55. if err != nil && err != cache.ErrNotFound {
  56. return nil, fmt.Errorf("unexpected error occurred during fetch serverless runtime cache: %v", err)
  57. }
  58. if err == cache.ErrNotFound {
  59. runtimeModel, err := db.GetOne[models.ServerlessRuntime](
  60. db.Equal("plugin_unique_identifier", identity.String()),
  61. )
  62. if err == db.ErrDatabaseNotFound {
  63. return nil, fmt.Errorf("plugin serverless runtime not found: %s", identity.String())
  64. }
  65. if err != nil {
  66. return nil, fmt.Errorf("failed to load serverless runtime from db: %v", err)
  67. }
  68. cache.Store(p.getServerlessRuntimeCacheKey(identity), runtimeModel, time.Minute*30)
  69. runtime = &runtimeModel
  70. } else if err != nil {
  71. return nil, fmt.Errorf("failed to load serverless runtime from cache: %v", err)
  72. }
  73. return runtime, nil
  74. }