install_to_serverless.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. serverless "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_connector"
  5. "github.com/langgenius/dify-plugin-daemon/internal/db"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
  10. )
  11. // InstallToAWSFromPkg installs a plugin to AWS Lambda
  12. func (p *PluginManager) InstallToAWSFromPkg(
  13. originalPackager []byte,
  14. decoder decoder.PluginDecoder,
  15. source string,
  16. meta map[string]any,
  17. ) (
  18. *stream.Stream[PluginInstallResponse], error,
  19. ) {
  20. checksum, err := decoder.Checksum()
  21. if err != nil {
  22. return nil, err
  23. }
  24. // check valid manifest
  25. _, err = decoder.Manifest()
  26. if err != nil {
  27. return nil, err
  28. }
  29. uniqueIdentity, err := decoder.UniqueIdentity()
  30. if err != nil {
  31. return nil, err
  32. }
  33. response, err := serverless.LaunchPlugin(originalPackager, decoder)
  34. if err != nil {
  35. return nil, err
  36. }
  37. newResponse := stream.NewStream[PluginInstallResponse](128)
  38. routine.Submit(map[string]string{
  39. "module": "plugin_manager",
  40. "function": "InstallToAWSFromPkg",
  41. "checksum": checksum,
  42. "unique_identity": uniqueIdentity.String(),
  43. "source": source,
  44. }, func() {
  45. defer func() {
  46. newResponse.Close()
  47. }()
  48. functionUrl := ""
  49. functionName := ""
  50. response.Async(func(r serverless.LaunchFunctionResponse) {
  51. if r.Event == serverless.Info {
  52. newResponse.Write(PluginInstallResponse{
  53. Event: PluginInstallEventInfo,
  54. Data: "Installing...",
  55. })
  56. } else if r.Event == serverless.Done {
  57. if functionUrl == "" || functionName == "" {
  58. newResponse.Write(PluginInstallResponse{
  59. Event: PluginInstallEventError,
  60. Data: "Internal server error, failed to get lambda url or function name",
  61. })
  62. return
  63. }
  64. // check if the plugin is already installed
  65. _, err := db.GetOne[models.ServerlessRuntime](
  66. db.Equal("checksum", checksum),
  67. db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_SERVERLESS)),
  68. )
  69. if err == db.ErrDatabaseNotFound {
  70. // create a new serverless runtime
  71. serverlessModel := &models.ServerlessRuntime{
  72. Checksum: checksum,
  73. Type: models.SERVERLESS_RUNTIME_TYPE_SERVERLESS,
  74. FunctionURL: functionUrl,
  75. FunctionName: functionName,
  76. PluginUniqueIdentifier: uniqueIdentity.String(),
  77. }
  78. err = db.Create(serverlessModel)
  79. if err != nil {
  80. newResponse.Write(PluginInstallResponse{
  81. Event: PluginInstallEventError,
  82. Data: "Failed to create serverless runtime",
  83. })
  84. return
  85. }
  86. } else if err != nil {
  87. newResponse.Write(PluginInstallResponse{
  88. Event: PluginInstallEventError,
  89. Data: "Failed to check if the plugin is already installed",
  90. })
  91. return
  92. }
  93. newResponse.Write(PluginInstallResponse{
  94. Event: PluginInstallEventDone,
  95. Data: "Installed",
  96. })
  97. } else if r.Event == serverless.Error {
  98. newResponse.Write(PluginInstallResponse{
  99. Event: PluginInstallEventError,
  100. Data: "Internal server error",
  101. })
  102. } else if r.Event == serverless.FunctionUrl {
  103. functionUrl = r.Message
  104. } else if r.Event == serverless.Function {
  105. functionName = r.Message
  106. } else {
  107. newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
  108. }
  109. })
  110. })
  111. return newResponse, nil
  112. }