install_to_serverless.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. serverless "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_connector"
  5. "github.com/langgenius/dify-plugin-daemon/internal/db"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
  10. )
  11. // InstallToAWSFromPkg installs a plugin to AWS Lambda
  12. func (p *PluginManager) InstallToAWSFromPkg(
  13. originalPackager []byte,
  14. decoder decoder.PluginDecoder,
  15. source string,
  16. meta map[string]any,
  17. ) (
  18. *stream.Stream[PluginInstallResponse], error,
  19. ) {
  20. checksum, err := decoder.Checksum()
  21. if err != nil {
  22. return nil, err
  23. }
  24. declaration, err := decoder.Manifest()
  25. if err != nil {
  26. return nil, err
  27. }
  28. uniqueIdentity, err := decoder.UniqueIdentity()
  29. if err != nil {
  30. return nil, err
  31. }
  32. response, err := serverless.LaunchPlugin(originalPackager, decoder)
  33. if err != nil {
  34. return nil, err
  35. }
  36. newResponse := stream.NewStream[PluginInstallResponse](128)
  37. routine.Submit(map[string]string{
  38. "module": "plugin_manager",
  39. "function": "InstallToAWSFromPkg",
  40. "checksum": checksum,
  41. "unique_identity": uniqueIdentity.String(),
  42. "source": source,
  43. }, func() {
  44. defer func() {
  45. newResponse.Close()
  46. }()
  47. functionUrl := ""
  48. functionName := ""
  49. response.Async(func(r serverless.LaunchFunctionResponse) {
  50. if r.Event == serverless.Info {
  51. newResponse.Write(PluginInstallResponse{
  52. Event: PluginInstallEventInfo,
  53. Data: "Installing...",
  54. })
  55. } else if r.Event == serverless.Done {
  56. if functionUrl == "" || functionName == "" {
  57. newResponse.Write(PluginInstallResponse{
  58. Event: PluginInstallEventError,
  59. Data: "Internal server error, failed to get lambda url or function name",
  60. })
  61. return
  62. }
  63. // check if the plugin is already installed
  64. _, err := db.GetOne[models.ServerlessRuntime](
  65. db.Equal("checksum", checksum),
  66. db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_SERVERLESS)),
  67. )
  68. if err == db.ErrDatabaseNotFound {
  69. // create a new serverless runtime
  70. serverlessModel := &models.ServerlessRuntime{
  71. Checksum: checksum,
  72. Type: models.SERVERLESS_RUNTIME_TYPE_SERVERLESS,
  73. FunctionURL: functionUrl,
  74. FunctionName: functionName,
  75. PluginUniqueIdentifier: uniqueIdentity.String(),
  76. Declaration: declaration,
  77. }
  78. err = db.Create(serverlessModel)
  79. if err != nil {
  80. newResponse.Write(PluginInstallResponse{
  81. Event: PluginInstallEventError,
  82. Data: "Failed to create serverless runtime",
  83. })
  84. return
  85. }
  86. } else if err != nil {
  87. newResponse.Write(PluginInstallResponse{
  88. Event: PluginInstallEventError,
  89. Data: "Failed to check if the plugin is already installed",
  90. })
  91. return
  92. }
  93. newResponse.Write(PluginInstallResponse{
  94. Event: PluginInstallEventDone,
  95. Data: "Installed",
  96. })
  97. } else if r.Event == serverless.Error {
  98. newResponse.Write(PluginInstallResponse{
  99. Event: PluginInstallEventError,
  100. Data: "Internal server error",
  101. })
  102. } else if r.Event == serverless.FunctionUrl {
  103. functionUrl = r.Message
  104. } else if r.Event == serverless.Function {
  105. functionName = r.Message
  106. } else {
  107. newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
  108. }
  109. })
  110. })
  111. return newResponse, nil
  112. }