s3_storage.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package s3
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "time"
  7. "github.com/aws/aws-sdk-go-v2/aws"
  8. "github.com/aws/aws-sdk-go-v2/config"
  9. "github.com/aws/aws-sdk-go-v2/credentials"
  10. "github.com/aws/aws-sdk-go-v2/service/s3"
  11. "github.com/langgenius/dify-plugin-daemon/internal/oss"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. )
  14. type S3Storage struct {
  15. bucket string
  16. client *s3.Client
  17. }
  18. func NewS3Storage(ak string, sk string, region string, bucket string) (oss.OSS, error) {
  19. c, err := config.LoadDefaultConfig(
  20. context.TODO(),
  21. config.WithRegion(region),
  22. config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
  23. ak,
  24. sk,
  25. "",
  26. )),
  27. )
  28. if err != nil {
  29. return nil, err
  30. }
  31. client := s3.NewFromConfig(c)
  32. // check bucket
  33. _, err = client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
  34. Bucket: aws.String(bucket),
  35. })
  36. if err != nil {
  37. _, err = client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
  38. Bucket: aws.String(bucket),
  39. })
  40. if err != nil {
  41. return nil, err
  42. }
  43. }
  44. return &S3Storage{bucket: bucket, client: client}, nil
  45. }
  46. func (s *S3Storage) Save(key string, data []byte) error {
  47. _, err := s.client.PutObject(context.TODO(), &s3.PutObjectInput{
  48. Bucket: aws.String(s.bucket),
  49. Key: aws.String(key),
  50. Body: bytes.NewReader(data),
  51. })
  52. return err
  53. }
  54. func (s *S3Storage) Load(key string) ([]byte, error) {
  55. resp, err := s.client.GetObject(context.TODO(), &s3.GetObjectInput{
  56. Bucket: aws.String(s.bucket),
  57. Key: aws.String(key),
  58. })
  59. if err != nil {
  60. return nil, err
  61. }
  62. return io.ReadAll(resp.Body)
  63. }
  64. func (s *S3Storage) Exists(key string) (bool, error) {
  65. _, err := s.client.HeadObject(context.TODO(), &s3.HeadObjectInput{
  66. Bucket: aws.String(s.bucket),
  67. Key: aws.String(key),
  68. })
  69. return err == nil, nil
  70. }
  71. func (s *S3Storage) Delete(key string) error {
  72. _, err := s.client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
  73. Bucket: aws.String(s.bucket),
  74. Key: aws.String(key),
  75. })
  76. return err
  77. }
  78. func (s *S3Storage) List(prefix string) ([]string, error) {
  79. var keys []string
  80. input := &s3.ListObjectsV2Input{
  81. Bucket: aws.String(s.bucket),
  82. Prefix: aws.String(prefix),
  83. }
  84. paginator := s3.NewListObjectsV2Paginator(s.client, input)
  85. for paginator.HasMorePages() {
  86. page, err := paginator.NextPage(context.TODO())
  87. if err != nil {
  88. return nil, err
  89. }
  90. for _, obj := range page.Contents {
  91. keys = append(keys, *obj.Key)
  92. }
  93. }
  94. return keys, nil
  95. }
  96. func (s *S3Storage) State(key string) (oss.OSSState, error) {
  97. resp, err := s.client.HeadObject(context.TODO(), &s3.HeadObjectInput{
  98. Bucket: aws.String(s.bucket),
  99. Key: aws.String(key),
  100. })
  101. if err != nil {
  102. return oss.OSSState{}, err
  103. }
  104. if resp.ContentLength == nil {
  105. resp.ContentLength = parser.ToPtr[int64](0)
  106. }
  107. if resp.LastModified == nil {
  108. resp.LastModified = parser.ToPtr(time.Time{})
  109. }
  110. return oss.OSSState{
  111. Size: *resp.ContentLength,
  112. LastModified: *resp.LastModified,
  113. }, nil
  114. }