s3.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package aws
  2. import (
  3. "context"
  4. "io"
  5. "github.com/aws/aws-sdk-go-v2/config"
  6. "github.com/aws/aws-sdk-go-v2/credentials"
  7. "github.com/aws/aws-sdk-go-v2/service/s3"
  8. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  12. )
  13. var (
  14. s3Client *s3.Client
  15. s3Bucket *string
  16. )
  17. func InitS3(app *app.Config) {
  18. // Check if required AWS S3 configuration is provided
  19. if app.AWSS3Region == nil || app.AWSS3AccessKey == nil || app.AWSS3SecretKey == nil || app.AWSS3Bucket == nil {
  20. log.Panic("AWSS3Region, AWSS3AccessKey, AWSS3SecretKey, and AWSS3Bucket must be set")
  21. }
  22. // Load AWS configuration with provided credentials
  23. cfg, err := config.LoadDefaultConfig(
  24. context.TODO(),
  25. config.WithRegion(*app.AWSS3Region),
  26. config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
  27. *app.AWSS3AccessKey,
  28. *app.AWSS3SecretKey,
  29. "",
  30. )),
  31. )
  32. // Handle error if AWS config loading fails
  33. if err != nil {
  34. log.Panic("Failed to load AWS S3 config: %v", err)
  35. }
  36. log.Info("AWS S3 config loaded")
  37. // Create S3 client
  38. s3Client = s3.NewFromConfig(cfg)
  39. // Store S3 bucket name
  40. s3Bucket = app.AWSS3Bucket
  41. log.Info("AWS S3 client initialized successfully")
  42. }
  43. func StreamUploadToS3(ctx context.Context, key string, reader io.Reader) error {
  44. _, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
  45. Bucket: s3Bucket,
  46. Key: &key,
  47. Body: reader,
  48. })
  49. return err
  50. }
  51. func StreamDownloadFromS3(ctx context.Context, key string) (io.ReadCloser, error) {
  52. resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
  53. Bucket: s3Bucket,
  54. Key: &key,
  55. })
  56. if err != nil {
  57. return nil, err
  58. }
  59. return resp.Body, nil
  60. }
  61. func DeleteFromS3(ctx context.Context, key string) error {
  62. _, err := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
  63. Bucket: s3Bucket,
  64. Key: &key,
  65. })
  66. return err
  67. }
  68. func ListFromS3(ctx context.Context, prefix string) ([]string, error) {
  69. resp, err := s3Client.ListObjects(ctx, &s3.ListObjectsInput{
  70. Bucket: s3Bucket,
  71. Prefix: &prefix,
  72. })
  73. if err != nil {
  74. return nil, err
  75. }
  76. return mapping.MapArray(resp.Contents, func(obj types.Object) string {
  77. if obj.Key != nil {
  78. return *obj.Key
  79. }
  80. return ""
  81. }), nil
  82. }
  83. func HeadObject(ctx context.Context, key string) (*s3.HeadObjectOutput, error) {
  84. return s3Client.HeadObject(ctx, &s3.HeadObjectInput{
  85. Bucket: s3Bucket,
  86. Key: &key,
  87. })
  88. }