environment.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package aws_manager
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. )
  11. var (
  12. AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
  13. )
  14. func (r *AWSPluginRuntime) InitEnvironment() error {
  15. if err := r.initEnvironment(); err != nil {
  16. return err
  17. }
  18. // init http client
  19. r.client = &http.Client{
  20. Transport: &http.Transport{
  21. Dial: (&net.Dialer{
  22. Timeout: 5 * time.Second,
  23. KeepAlive: 15 * time.Second,
  24. }).Dial,
  25. IdleConnTimeout: 120 * time.Second,
  26. },
  27. }
  28. return nil
  29. }
  30. func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
  31. return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
  32. }
  33. func (r *AWSPluginRuntime) initEnvironment() error {
  34. r.Log("Starting to initialize environment")
  35. // check if the plugin has already been initialized, at most 300s
  36. if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
  37. return err
  38. }
  39. defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
  40. r.Log("Started to initialize environment")
  41. identity, err := r.Identity()
  42. if err != nil {
  43. return err
  44. }
  45. function, err := fetchLambda(identity.String(), r.Checksum())
  46. if err != nil {
  47. if err != ErrNoLambdaFunction {
  48. return err
  49. }
  50. } else {
  51. // found, return directly
  52. r.lambda_url = function.FunctionURL
  53. r.lambda_name = function.FunctionName
  54. r.Log(fmt.Sprintf("Found existing lambda function: %s", r.lambda_name))
  55. return nil
  56. }
  57. // create it if not found
  58. r.Log("Creating new lambda function")
  59. // create lambda function
  60. packager := NewPackager(r, r.Decoder)
  61. context, err := packager.Pack()
  62. if err != nil {
  63. return err
  64. }
  65. defer os.Remove(context.Name())
  66. defer context.Close()
  67. response, err := launchLambda(identity.String(), r.Checksum(), context)
  68. if err != nil {
  69. return err
  70. }
  71. for response.Next() {
  72. response, err := response.Read()
  73. if err != nil {
  74. return err
  75. }
  76. switch response.Event {
  77. case Error:
  78. return fmt.Errorf("error: %s", response.Message)
  79. case LambdaUrl:
  80. r.lambda_url = response.Message
  81. case Lambda:
  82. r.lambda_name = response.Message
  83. case Info:
  84. r.Log(fmt.Sprintf("installing: %s", response.Message))
  85. }
  86. }
  87. return nil
  88. }