Browse Source

feat: add oss

Yeuoly 8 months ago
parent
commit
9ad9d7d4de
3 changed files with 223 additions and 0 deletions
  1. 68 0
      internal/oss/local/local_storage.go
  2. 132 0
      internal/oss/s3/s3_storage.go
  3. 23 0
      internal/oss/type.go

+ 68 - 0
internal/oss/local/local_storage.go

@@ -0,0 +1,68 @@
+package local
+
+import (
+	"os"
+	"path/filepath"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/oss"
+)
+
+type LocalStorage struct {
+	root string
+}
+
+func NewLocalStorage(root string) oss.OSS {
+	return &LocalStorage{root: root}
+}
+
+func (l *LocalStorage) Save(key string, data []byte) error {
+	path := filepath.Join(l.root, key)
+
+	return os.WriteFile(path, data, 0o644)
+}
+
+func (l *LocalStorage) Load(key string) ([]byte, error) {
+	path := filepath.Join(l.root, key)
+
+	return os.ReadFile(path)
+}
+
+func (l *LocalStorage) Exists(key string) (bool, error) {
+	path := filepath.Join(l.root, key)
+
+	_, err := os.Stat(path)
+	return err == nil, nil
+}
+
+func (l *LocalStorage) State(key string) (oss.OSSState, error) {
+	path := filepath.Join(l.root, key)
+
+	info, err := os.Stat(path)
+	if err != nil {
+		return oss.OSSState{}, err
+	}
+
+	return oss.OSSState{Size: info.Size(), LastModified: info.ModTime()}, nil
+}
+
+func (l *LocalStorage) List(prefix string) ([]string, error) {
+	prefix = filepath.Join(l.root, prefix)
+
+	entries, err := os.ReadDir(prefix)
+	if err != nil {
+		return nil, err
+	}
+
+	paths := make([]string, 0, len(entries))
+	for _, entry := range entries {
+		paths = append(paths, filepath.Join(prefix, entry.Name()))
+	}
+
+	return paths, nil
+}
+
+func (l *LocalStorage) Delete(key string) error {
+	path := filepath.Join(l.root, key)
+
+	return os.RemoveAll(path)
+}

+ 132 - 0
internal/oss/s3/s3_storage.go

@@ -0,0 +1,132 @@
+package s3
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"time"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/config"
+	"github.com/aws/aws-sdk-go-v2/credentials"
+	"github.com/aws/aws-sdk-go-v2/service/s3"
+	"github.com/langgenius/dify-plugin-daemon/internal/oss"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
+
+type S3Storage struct {
+	bucket string
+	client *s3.Client
+}
+
+func NewS3Storage(ak string, sk string, region string, bucket string) (oss.OSS, error) {
+	c, err := config.LoadDefaultConfig(
+		context.TODO(),
+		config.WithRegion(region),
+		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
+			ak,
+			sk,
+			"",
+		)),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	client := s3.NewFromConfig(c)
+
+	// check bucket
+	_, err = client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
+		Bucket: aws.String(bucket),
+	})
+	if err != nil {
+		_, err = client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
+			Bucket: aws.String(bucket),
+		})
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return &S3Storage{bucket: bucket, client: client}, nil
+}
+
+func (s *S3Storage) Save(key string, data []byte) error {
+	_, err := s.client.PutObject(context.TODO(), &s3.PutObjectInput{
+		Bucket: aws.String(s.bucket),
+		Key:    aws.String(key),
+		Body:   bytes.NewReader(data),
+	})
+	return err
+}
+
+func (s *S3Storage) Load(key string) ([]byte, error) {
+	resp, err := s.client.GetObject(context.TODO(), &s3.GetObjectInput{
+		Bucket: aws.String(s.bucket),
+		Key:    aws.String(key),
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	return io.ReadAll(resp.Body)
+}
+
+func (s *S3Storage) Exists(key string) (bool, error) {
+	_, err := s.client.HeadObject(context.TODO(), &s3.HeadObjectInput{
+		Bucket: aws.String(s.bucket),
+		Key:    aws.String(key),
+	})
+	return err == nil, nil
+}
+
+func (s *S3Storage) Delete(key string) error {
+	_, err := s.client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
+		Bucket: aws.String(s.bucket),
+		Key:    aws.String(key),
+	})
+	return err
+}
+
+func (s *S3Storage) List(prefix string) ([]string, error) {
+	var keys []string
+	input := &s3.ListObjectsV2Input{
+		Bucket: aws.String(s.bucket),
+		Prefix: aws.String(prefix),
+	}
+
+	paginator := s3.NewListObjectsV2Paginator(s.client, input)
+	for paginator.HasMorePages() {
+		page, err := paginator.NextPage(context.TODO())
+		if err != nil {
+			return nil, err
+		}
+		for _, obj := range page.Contents {
+			keys = append(keys, *obj.Key)
+		}
+	}
+
+	return keys, nil
+}
+
+func (s *S3Storage) State(key string) (oss.OSSState, error) {
+	resp, err := s.client.HeadObject(context.TODO(), &s3.HeadObjectInput{
+		Bucket: aws.String(s.bucket),
+		Key:    aws.String(key),
+	})
+	if err != nil {
+		return oss.OSSState{}, err
+	}
+
+	if resp.ContentLength == nil {
+		resp.ContentLength = parser.ToPtr[int64](0)
+	}
+	if resp.LastModified == nil {
+		resp.LastModified = parser.ToPtr(time.Time{})
+	}
+
+	return oss.OSSState{
+		Size:         *resp.ContentLength,
+		LastModified: *resp.LastModified,
+	}, nil
+}

+ 23 - 0
internal/oss/type.go

@@ -0,0 +1,23 @@
+package oss
+
+import "time"
+
+type OSSState struct {
+	Size         int64
+	LastModified time.Time
+}
+
+type OSS interface {
+	// Save saves data into path key
+	Save(key string, data []byte) error
+	// Load loads data from path key
+	Load(key string) ([]byte, error)
+	// Exists checks if the data exists in the path key
+	Exists(key string) (bool, error)
+	// State gets the state of the data in the path key
+	State(key string) (OSSState, error)
+	// List lists all the data with the given prefix, and all the paths are absolute paths
+	List(prefix string) ([]string, error)
+	// Delete deletes the data in the path key
+	Delete(key string) error
+}