install.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  12. )
  13. type PluginInstallEvent string
  14. const (
  15. PluginInstallEventInfo PluginInstallEvent = "info"
  16. PluginInstallEventDone PluginInstallEvent = "done"
  17. PluginInstallEventError PluginInstallEvent = "error"
  18. )
  19. type PluginInstallResponse struct {
  20. Event PluginInstallEvent `json:"event"`
  21. Data string `json:"data"`
  22. }
  23. // InstallToAWSFromPkg installs a plugin to AWS Lambda
  24. func (p *PluginManager) InstallToAWSFromPkg(
  25. tenant_id string, decoder decoder.PluginDecoder,
  26. source string,
  27. meta map[string]any,
  28. ) (
  29. *stream.Stream[PluginInstallResponse], error,
  30. ) {
  31. checksum, err := decoder.Checksum()
  32. if err != nil {
  33. return nil, err
  34. }
  35. declaration, err := decoder.Manifest()
  36. if err != nil {
  37. return nil, err
  38. }
  39. unique_identity, err := decoder.UniqueIdentity()
  40. if err != nil {
  41. return nil, err
  42. }
  43. response, err := serverless.UploadPlugin(decoder)
  44. if err != nil {
  45. return nil, err
  46. }
  47. new_response := stream.NewStream[PluginInstallResponse](128)
  48. routine.Submit(func() {
  49. defer func() {
  50. new_response.Close()
  51. }()
  52. lambda_url := ""
  53. lambda_function_name := ""
  54. response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
  55. if r.Event == serverless.Info {
  56. new_response.Write(PluginInstallResponse{
  57. Event: PluginInstallEventInfo,
  58. Data: "Installing...",
  59. })
  60. } else if r.Event == serverless.Done {
  61. if lambda_url == "" || lambda_function_name == "" {
  62. new_response.Write(PluginInstallResponse{
  63. Event: PluginInstallEventError,
  64. Data: "Internal server error, failed to get lambda url or function name",
  65. })
  66. return
  67. }
  68. // check if the plugin is already installed
  69. _, err := db.GetOne[models.ServerlessRuntime](
  70. db.Equal("checksum", checksum),
  71. db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
  72. )
  73. if err == db.ErrDatabaseNotFound {
  74. // create a new serverless runtime
  75. serverless_model := &models.ServerlessRuntime{
  76. Checksum: checksum,
  77. Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
  78. FunctionURL: lambda_url,
  79. FunctionName: lambda_function_name,
  80. PluginUniqueIdentifier: unique_identity.String(),
  81. Declaration: declaration,
  82. }
  83. err = db.Create(serverless_model)
  84. if err != nil {
  85. new_response.Write(PluginInstallResponse{
  86. Event: PluginInstallEventError,
  87. Data: "Failed to create serverless runtime",
  88. })
  89. return
  90. }
  91. } else if err != nil {
  92. new_response.Write(PluginInstallResponse{
  93. Event: PluginInstallEventError,
  94. Data: "Failed to check if the plugin is already installed",
  95. })
  96. return
  97. }
  98. _, _, err = curd.InstallPlugin(
  99. tenant_id,
  100. unique_identity,
  101. plugin_entities.PLUGIN_RUNTIME_TYPE_AWS,
  102. &declaration,
  103. source,
  104. meta,
  105. )
  106. if err != nil {
  107. new_response.Write(PluginInstallResponse{
  108. Event: PluginInstallEventError,
  109. Data: "Failed to create plugin",
  110. })
  111. return
  112. }
  113. new_response.Write(PluginInstallResponse{
  114. Event: PluginInstallEventDone,
  115. Data: "Installed",
  116. })
  117. } else if r.Event == serverless.Error {
  118. new_response.Write(PluginInstallResponse{
  119. Event: PluginInstallEventError,
  120. Data: "Internal server error",
  121. })
  122. } else if r.Event == serverless.LambdaUrl {
  123. lambda_url = r.Message
  124. } else if r.Event == serverless.Lambda {
  125. lambda_function_name = r.Message
  126. } else {
  127. new_response.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
  128. }
  129. })
  130. })
  131. return new_response, nil
  132. }
  133. // InstallToLocal installs a plugin to local
  134. func (p *PluginManager) InstallToLocal(
  135. tenant_id string, decoder decoder.PluginDecoder,
  136. source string,
  137. meta map[string]any,
  138. ) (
  139. *stream.Stream[PluginInstallResponse], error,
  140. ) {
  141. return nil, nil
  142. }