install_to_serverless.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. // InstallToAWSFromPkg installs a plugin to AWS Lambda
  12. func (p *PluginManager) InstallToAWSFromPkg(
  13. decoder decoder.PluginDecoder,
  14. source string,
  15. meta map[string]any,
  16. ) (
  17. *stream.Stream[PluginInstallResponse], error,
  18. ) {
  19. checksum, err := decoder.Checksum()
  20. if err != nil {
  21. return nil, err
  22. }
  23. declaration, err := decoder.Manifest()
  24. if err != nil {
  25. return nil, err
  26. }
  27. uniqueIdentity, err := decoder.UniqueIdentity()
  28. if err != nil {
  29. return nil, err
  30. }
  31. response, err := serverless.UploadPlugin(decoder)
  32. if err != nil {
  33. return nil, err
  34. }
  35. newResponse := stream.NewStream[PluginInstallResponse](128)
  36. routine.Submit(map[string]string{
  37. "module": "plugin_manager",
  38. "function": "InstallToAWSFromPkg",
  39. "checksum": checksum,
  40. "unique_identity": uniqueIdentity.String(),
  41. "source": source,
  42. }, func() {
  43. defer func() {
  44. newResponse.Close()
  45. }()
  46. lambdaUrl := ""
  47. lambdaFunctionName := ""
  48. response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
  49. if r.Event == serverless.Info {
  50. newResponse.Write(PluginInstallResponse{
  51. Event: PluginInstallEventInfo,
  52. Data: "Installing...",
  53. })
  54. } else if r.Event == serverless.Done {
  55. if lambdaUrl == "" || lambdaFunctionName == "" {
  56. newResponse.Write(PluginInstallResponse{
  57. Event: PluginInstallEventError,
  58. Data: "Internal server error, failed to get lambda url or function name",
  59. })
  60. return
  61. }
  62. // check if the plugin is already installed
  63. _, err := db.GetOne[models.ServerlessRuntime](
  64. db.Equal("checksum", checksum),
  65. db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
  66. )
  67. if err == db.ErrDatabaseNotFound {
  68. // create a new serverless runtime
  69. serverlessModel := &models.ServerlessRuntime{
  70. Checksum: checksum,
  71. Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
  72. FunctionURL: lambdaUrl,
  73. FunctionName: lambdaFunctionName,
  74. PluginUniqueIdentifier: uniqueIdentity.String(),
  75. Declaration: declaration,
  76. }
  77. err = db.Create(serverlessModel)
  78. if err != nil {
  79. newResponse.Write(PluginInstallResponse{
  80. Event: PluginInstallEventError,
  81. Data: "Failed to create serverless runtime",
  82. })
  83. return
  84. }
  85. } else if err != nil {
  86. newResponse.Write(PluginInstallResponse{
  87. Event: PluginInstallEventError,
  88. Data: "Failed to check if the plugin is already installed",
  89. })
  90. return
  91. }
  92. newResponse.Write(PluginInstallResponse{
  93. Event: PluginInstallEventDone,
  94. Data: "Installed",
  95. })
  96. } else if r.Event == serverless.Error {
  97. newResponse.Write(PluginInstallResponse{
  98. Event: PluginInstallEventError,
  99. Data: "Internal server error",
  100. })
  101. } else if r.Event == serverless.LambdaUrl {
  102. lambdaUrl = r.Message
  103. } else if r.Event == serverless.Lambda {
  104. lambdaFunctionName = r.Message
  105. } else {
  106. newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
  107. }
  108. })
  109. })
  110. return newResponse, nil
  111. }