environment.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package aws_manager
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  9. )
  10. var (
  11. AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
  12. )
  13. func (r *AWSPluginRuntime) InitEnvironment() error {
  14. if err := r.initEnvironment(); err != nil {
  15. return err
  16. }
  17. // init http client
  18. r.client = &http.Client{
  19. Transport: &http.Transport{
  20. Dial: (&net.Dialer{
  21. Timeout: 5 * time.Second,
  22. KeepAlive: 15 * time.Second,
  23. }).Dial,
  24. IdleConnTimeout: 120 * time.Second,
  25. },
  26. }
  27. return nil
  28. }
  29. func (r *AWSPluginRuntime) Identity() (string, error) {
  30. return fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum()), nil
  31. }
  32. func (r *AWSPluginRuntime) initEnvironment() error {
  33. r.Log("Starting to initialize environment")
  34. // check if the plugin has already been initialized, at most 300s
  35. if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
  36. return err
  37. }
  38. defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
  39. r.Log("Started to initialize environment")
  40. identity, err := r.Identity()
  41. if err != nil {
  42. return err
  43. }
  44. function, err := fetchLambda(identity, r.Checksum())
  45. if err != nil {
  46. if err != ErrNoLambdaFunction {
  47. return err
  48. }
  49. } else {
  50. // found, return directly
  51. r.lambda_url = function.FunctionURL
  52. r.lambda_name = function.FunctionName
  53. r.Log(fmt.Sprintf("Found existing lambda function: %s", r.lambda_name))
  54. return nil
  55. }
  56. // create it if not found
  57. r.Log("Creating new lambda function")
  58. // create lambda function
  59. packager := NewPackager(r, r.Decoder)
  60. context, err := packager.Pack()
  61. if err != nil {
  62. return err
  63. }
  64. defer os.Remove(context.Name())
  65. defer context.Close()
  66. response, err := launchLambda(identity, r.Checksum(), context)
  67. if err != nil {
  68. return err
  69. }
  70. for response.Next() {
  71. response, err := response.Read()
  72. if err != nil {
  73. return err
  74. }
  75. switch response.Event {
  76. case Error:
  77. return fmt.Errorf("error: %s", response.Message)
  78. case LambdaUrl:
  79. r.lambda_url = response.Message
  80. case Lambda:
  81. r.lambda_name = response.Message
  82. case Info:
  83. r.Log(fmt.Sprintf("installing: %s", response.Message))
  84. }
  85. }
  86. return nil
  87. }