environment.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package aws_manager
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. )
  11. var (
  12. AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
  13. )
  14. func (r *AWSPluginRuntime) InitEnvironment() error {
  15. // init http client
  16. r.client = &http.Client{
  17. Transport: &http.Transport{
  18. Dial: (&net.Dialer{
  19. Timeout: 5 * time.Second,
  20. KeepAlive: 15 * time.Second,
  21. }).Dial,
  22. IdleConnTimeout: 120 * time.Second,
  23. },
  24. }
  25. return nil
  26. }
  27. func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
  28. return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
  29. }
  30. // UploadPlugin uploads the plugin to the AWS Lambda
  31. func (r *AWSPluginRuntime) UploadPlugin() error {
  32. r.Log("Starting to initialize environment")
  33. // check if the plugin has already been initialized, at most 300s
  34. if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
  35. return err
  36. }
  37. defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
  38. r.Log("Started to initialize environment")
  39. identity, err := r.Identity()
  40. if err != nil {
  41. return err
  42. }
  43. function, err := fetchLambda(identity.String(), r.Checksum())
  44. if err != nil {
  45. if err != ErrNoLambdaFunction {
  46. return err
  47. }
  48. } else {
  49. // found, return directly
  50. r.LambdaURL = function.FunctionURL
  51. r.LambdaName = function.FunctionName
  52. r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
  53. return nil
  54. }
  55. // create it if not found
  56. r.Log("Creating new lambda function")
  57. // create lambda function
  58. packager := NewPackager(r, r.Decoder)
  59. context, err := packager.Pack()
  60. if err != nil {
  61. return err
  62. }
  63. defer os.Remove(context.Name())
  64. defer context.Close()
  65. response, err := launchLambda(identity.String(), r.Checksum(), context)
  66. if err != nil {
  67. return err
  68. }
  69. for response.Next() {
  70. response, err := response.Read()
  71. if err != nil {
  72. return err
  73. }
  74. switch response.Event {
  75. case Error:
  76. return fmt.Errorf("error: %s", response.Message)
  77. case LambdaUrl:
  78. r.LambdaURL = response.Message
  79. case Lambda:
  80. r.LambdaName = response.Message
  81. case Info:
  82. r.Log(fmt.Sprintf("installing: %s", response.Message))
  83. }
  84. }
  85. return nil
  86. }