ソースを参照

feat: pack docker build context

Yeuoly 11 ヶ月 前
コミット
b3210b8955

+ 3 - 0
internal/core/plugin_manager/aws_manager/dockerfile/build_test.go

@@ -11,6 +11,9 @@ import (
 func preparePluginDeclaration() *plugin_entities.PluginDeclaration {
 	return &plugin_entities.PluginDeclaration{
 		Meta: plugin_entities.PluginMeta{
+			Arch: []constants.Arch{
+				constants.AMD64,
+			},
 			Runner: plugin_entities.PluginRunner{
 				Language:   constants.Python,
 				Version:    "3.12",

+ 147 - 4
internal/core/plugin_manager/aws_manager/packager.go

@@ -1,22 +1,165 @@
 package aws_manager
 
 import (
+	"archive/tar"
+	"compress/gzip"
+	"errors"
+	"fmt"
+	"io"
+	"io/fs"
+	"os"
+	"path"
+	"strings"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager/dockerfile"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
 )
 
 type Packager struct {
-	runtime entities.PluginRuntime
+	runtime entities.PluginRuntimeInterface
 	decoder decoder.PluginDecoder
 }
 
-func NewPackager(runtime entities.PluginRuntime, decoder decoder.PluginDecoder) *Packager {
+func NewPackager(runtime entities.PluginRuntimeInterface, decoder decoder.PluginDecoder) *Packager {
 	return &Packager{
 		runtime: runtime,
 		decoder: decoder,
 	}
 }
 
+type dockerFileInfo struct {
+	fs.FileInfo
+
+	size int64
+}
+
+func (d *dockerFileInfo) Size() int64 {
+	return d.size
+}
+
+func (d *dockerFileInfo) Name() string {
+	return "Dockerfile"
+}
+
+func (d *dockerFileInfo) Mode() os.FileMode {
+	return 0644
+}
+
+func (d *dockerFileInfo) ModTime() time.Time {
+	return time.Now()
+}
+
+func (d *dockerFileInfo) IsDir() bool {
+	return false
+}
+
+func (d *dockerFileInfo) Sys() any {
+	return nil
+}
+
 // Pack takes a plugin and packs it into a tar file with dockerfile inside
-// for the
-func (p *Packager) Pack()
+// returns a *os.File with the tar file
+func (p *Packager) Pack() (*os.File, error) {
+	// walk through the plugin directory and add it to a tar file
+	// create a tmpfile
+	tmpfile, cleanup, err := tmpfile.CreateTempFile("plugin-aws-tar-*")
+	if err != nil {
+		return nil, err
+	}
+	success := false
+
+	defer func() {
+		if !success {
+			cleanup()
+		}
+	}()
+
+	gzip_writer, err := gzip.NewWriterLevel(tmpfile, gzip.BestCompression)
+	if err != nil {
+		return nil, err
+	}
+	defer gzip_writer.Close()
+
+	tar_writer := tar.NewWriter(gzip_writer)
+	defer tar_writer.Close()
+
+	if err := p.decoder.Walk(func(filename, dir string) error {
+		if strings.ToLower(filename) == "dockerfile" {
+			return errors.New("dockerfile is not allowed to be in the plugin directory")
+		}
+
+		full_filename := path.Join(dir, filename)
+
+		state, err := p.decoder.Stat(filename)
+		if err != nil {
+			return err
+		}
+
+		if state.Size() > 1024*1024*10 {
+			// 10MB, 1 single file is too large
+			return fmt.Errorf("file size is too large: %s, max 10MB", full_filename)
+		}
+
+		tar_header, err := tar.FileInfoHeader(state, full_filename)
+		if err != nil {
+			return err
+		}
+		tar_header.Name = filename
+
+		// write tar header
+		if err := tar_writer.WriteHeader(tar_header); err != nil {
+			return err
+		}
+
+		// write file content
+		file_reader, err := p.decoder.FileReader(full_filename)
+		if err != nil {
+			return err
+		}
+		if _, err := io.Copy(tar_writer, file_reader); err != nil {
+			file_reader.Close()
+			return err
+		}
+		// release resources
+		file_reader.Close()
+
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	// add dockerfile
+	dockerfile, err := dockerfile.GenerateDockerfile(p.runtime.Configuration())
+	if err != nil {
+		return nil, err
+	}
+
+	tar_header, err := tar.FileInfoHeader(&dockerFileInfo{
+		size: int64(len(dockerfile)),
+	}, "Dockerfile")
+	if err != nil {
+		return nil, err
+	}
+
+	// create a fake dockerfile stat
+	if err := tar_writer.WriteHeader(tar_header); err != nil {
+		return nil, err
+	}
+
+	if _, err := tar_writer.Write([]byte(dockerfile)); err != nil {
+		return nil, err
+	}
+
+	// close writers to flush data
+	tar_writer.Close()
+	gzip_writer.Close()
+
+	tmpfile.Seek(0, io.SeekStart)
+
+	success = true
+
+	return tmpfile, nil
+}

+ 174 - 0
internal/core/plugin_manager/aws_manager/packager_test.go

@@ -0,0 +1,174 @@
+package aws_manager
+
+import (
+	"archive/tar"
+	"compress/gzip"
+	"embed"
+	"io"
+	"io/fs"
+	"os"
+	"path"
+	"path/filepath"
+	"testing"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+)
+
+type TPluginRuntime struct {
+	entities.PluginRuntime
+}
+
+func (r *TPluginRuntime) InitEnvironment() error {
+	return nil
+}
+
+func (r *TPluginRuntime) Checksum() string {
+	return "test_checksum"
+}
+
+func (r *TPluginRuntime) Identity() (string, error) {
+	return "", nil
+}
+
+func (r *TPluginRuntime) StartPlugin() error {
+	return nil
+}
+
+func (r *TPluginRuntime) Type() entities.PluginRuntimeType {
+	return entities.PLUGIN_RUNTIME_TYPE_LOCAL
+}
+
+func (r *TPluginRuntime) Wait() (<-chan bool, error) {
+	return nil, nil
+}
+
+func (r *TPluginRuntime) Listen(string) *entities.IOListener[[]byte] {
+	return nil
+}
+
+func (r *TPluginRuntime) Write(string, []byte) {
+}
+
+//go:embed packager_test_plugin/*
+var test_plugin embed.FS
+
+func TestPackager_Pack(t *testing.T) {
+	// create a temp dir
+	tmpDir, err := os.MkdirTemp("", "test_plugin")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmpDir)
+
+	// copy the test_plugin to the temp dir
+	if err := fs.WalkDir(test_plugin, ".", func(path string, d fs.DirEntry, err error) error {
+		if err != nil {
+			return err
+		}
+
+		if d.IsDir() {
+			// create the dir
+			os.MkdirAll(filepath.Join(tmpDir, path), 0755)
+		} else {
+			// copy the file
+			origin_file, err := test_plugin.Open(path)
+			if err != nil {
+				return err
+			}
+			defer origin_file.Close()
+			if err := os.WriteFile(filepath.Join(tmpDir, path), []byte{}, 0644); err != nil {
+				return err
+			}
+		}
+
+		return nil
+	}); err != nil {
+		t.Fatal(err)
+	}
+
+	decoder, err := decoder.NewFSPluginDecoder(path.Join(tmpDir, "packager_test_plugin"))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	packager := NewPackager(&TPluginRuntime{
+		PluginRuntime: entities.PluginRuntime{
+			Config: plugin_entities.PluginDeclaration{
+				Meta: plugin_entities.PluginMeta{
+					Runner: plugin_entities.PluginRunner{
+						Language:   constants.Python,
+						Version:    "3.12",
+						Entrypoint: "main",
+					},
+					Arch: []constants.Arch{
+						constants.AMD64,
+					},
+				},
+			},
+		},
+	}, decoder)
+
+	f, err := packager.Pack()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		os.Remove(f.Name())
+	}()
+
+	// read tar file and check if there is a dockerfile
+	// Open the tar file
+	tar_gz_file, err := os.Open(f.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer tar_gz_file.Close()
+
+	// Create a new gzip reader
+	gzip_reader, err := gzip.NewReader(tar_gz_file)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer gzip_reader.Close()
+
+	// Create a new tar reader
+	tar_reader := tar.NewReader(gzip_reader)
+
+	dockerfile_found := false
+	requirements_found := false
+	main_py_found := false
+
+	// Iterate through the files in the tar.gz archive
+	for {
+		header, err := tar_reader.Next()
+		if err == io.EOF {
+			break // End of archive
+		}
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		switch header.Name {
+		case "Dockerfile":
+			dockerfile_found = true
+		case "requirements.txt":
+			requirements_found = true
+		case "main.py":
+			main_py_found = true
+		}
+	}
+
+	// Check if all required files are present
+	if !dockerfile_found {
+		t.Error("Dockerfile not found in the packed archive")
+	}
+	if !requirements_found {
+		t.Error("requirements.txt not found in the packed archive")
+	}
+	if !main_py_found {
+		t.Error("main.py not found in the packed archive")
+	}
+}

+ 5 - 0
internal/core/plugin_manager/aws_manager/packager_test_plugin/main.py

@@ -0,0 +1,5 @@
+def main():
+    print("Hello, World!")
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
internal/core/plugin_manager/aws_manager/packager_test_plugin/manifest.yaml


+ 0 - 0
internal/core/plugin_manager/aws_manager/packager_test_plugin/requirements.txt


+ 5 - 2
internal/core/plugin_manager/remote_manager/codec.go

@@ -32,10 +32,13 @@ func (w *codec) getLines(data []byte) [][]byte {
 	w.buf.Write(data)
 
 	// read line by line, split by \n, remaining data will be kept in buffer
-	lines := bytes.Split(w.buf.Bytes(), []byte("\n"))
+	buf := make([]byte, w.buf.Len())
+	w.buf.Read(buf)
 	w.buf.Reset()
 
-	// if last line is not complete, keep it in buffer
+	lines := bytes.Split(buf, []byte("\n"))
+
+	// if last line is not completed, keep it in buffer
 	if len(lines[len(lines)-1]) != 0 {
 		w.buf.Write(lines[len(lines)-1])
 		lines = lines[:len(lines)-1]

+ 16 - 1
internal/core/plugin_manager/remote_manager/codec_test.go

@@ -1,6 +1,8 @@
 package remote_manager
 
-import "testing"
+import (
+	"testing"
+)
 
 func TestCodec(t *testing.T) {
 	codec := &codec{}
@@ -19,3 +21,16 @@ func TestCodec(t *testing.T) {
 		t.Error("getLines failed")
 	}
 }
+
+func TestCodec2(t *testing.T) {
+	codec := &codec{}
+
+	msg := "9c3df1b4-6daf-4cb4-bcaa-3f05a2dbc3a1\n{\"version\":\"1.0.0\",\"type\":\"plugin\",\"author\":\"Yeuoly\",\"name\":\"ci_test\",\"created_at\":\"2024-08-14T19:48:04.867581+08:00\",\"resource\":{\"memory\":1,\"storage\":1,\"permission\":null},\"plugins\":[\"test\"],\"execution\":{\"install\":\"echo 'hello'\",\"launch\":\"echo 'hello'\"},\"meta\":{\"version\":\"0.0.1\",\"arch\":[\"amd64\"],\"runner\":{\"language\":\"python\",\"version\":\"3.12\",\"entrypoint\":\"main\"}}}"
+
+	lines := codec.getLines([]byte(msg))
+	if len(lines) != 1 {
+		if string(lines[0]) != msg[:len(lines[0])] {
+			t.Error("getLines failed")
+		}
+	}
+}

+ 17 - 2
internal/core/plugin_manager/remote_manager/server_test.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
@@ -151,6 +152,17 @@ func TestAcceptConnection(t *testing.T) {
 			Install: "echo 'hello'",
 			Launch:  "echo 'hello'",
 		},
+		Meta: plugin_entities.PluginMeta{
+			Version: "0.0.1",
+			Arch: []constants.Arch{
+				constants.AMD64,
+			},
+			Runner: plugin_entities.PluginRunner{
+				Language:   constants.Python,
+				Version:    "3.12",
+				Entrypoint: "main",
+			},
+		},
 	})
 	conn.Write([]byte(key))
 	conn.Write([]byte("\n"))
@@ -158,14 +170,17 @@ func TestAcceptConnection(t *testing.T) {
 	conn.Write([]byte("\n"))
 	closed_chan := make(chan bool)
 
+	msg := ""
+
 	go func() {
 		// block here to accept messages until the connection is closed
 		buffer := make([]byte, 1024)
 		for {
-			_, err := conn.Read(buffer)
+			n, err := conn.Read(buffer)
 			if err != nil {
 				break
 			}
+			msg += string(buffer[:n])
 		}
 		close(closed_chan)
 	}()
@@ -178,7 +193,7 @@ func TestAcceptConnection(t *testing.T) {
 	case <-closed_chan:
 		// success
 		if !got_connection {
-			t.Errorf("failed to accept connection")
+			t.Errorf("failed to accept connection: %s", msg)
 			return
 		}
 		if connection_err != nil {

+ 25 - 0
internal/core/plugin_packager/decoder/decoder.go

@@ -1,10 +1,35 @@
 package decoder
 
+import (
+	"io"
+	"io/fs"
+)
+
+// PluginDecoder is an interface for decoding and interacting with plugin files
 type PluginDecoder interface {
+	// Open initializes the decoder and prepares it for use
 	Open() error
+
+	// Walk traverses the plugin files and calls the provided function for each file
+	// The function is called with the filename and directory of each file
 	Walk(fn func(filename string, dir string) error) error
+
+	// ReadFile reads the entire contents of a file and returns it as a byte slice
 	ReadFile(filename string) ([]byte, error)
+
+	// Close releases any resources used by the decoder
 	Close() error
+
+	// Stat returns file info for the specified filename
+	Stat(filename string) (fs.FileInfo, error)
+
+	// FileReader returns an io.ReadCloser for reading the contents of a file
+	// Remember to close the reader when done using it
+	FileReader(filename string) (io.ReadCloser, error)
+
+	// Signature returns the signature of the plugin, if available
 	Signature() (string, error)
+
+	// CreateTime returns the creation time of the plugin as a Unix timestamp
 	CreateTime() (int64, error)
 }

+ 19 - 0
internal/core/plugin_packager/decoder/fs.go

@@ -2,9 +2,11 @@ package decoder
 
 import (
 	"errors"
+	"io"
 	"io/fs"
 	"os"
 	"path/filepath"
+	"strings"
 )
 
 var (
@@ -51,6 +53,15 @@ func (d *FSPluginDecoder) Open() error {
 
 func (d *FSPluginDecoder) Walk(fn func(filename string, dir string) error) error {
 	return filepath.Walk(d.root, func(path string, info fs.FileInfo, err error) error {
+		// trim the first directory path
+		path = strings.TrimPrefix(path, d.root)
+		// trim / from the beginning
+		path = strings.TrimPrefix(path, "/")
+
+		if path == "" {
+			return nil
+		}
+
 		if err != nil {
 			return err
 		}
@@ -63,10 +74,18 @@ func (d *FSPluginDecoder) Close() error {
 	return nil
 }
 
+func (d *FSPluginDecoder) Stat(filename string) (fs.FileInfo, error) {
+	return os.Stat(filepath.Join(d.root, filename))
+}
+
 func (d *FSPluginDecoder) ReadFile(filename string) ([]byte, error) {
 	return os.ReadFile(filepath.Join(d.root, filename))
 }
 
+func (d *FSPluginDecoder) FileReader(filename string) (io.ReadCloser, error) {
+	return os.Open(filepath.Join(d.root, filename))
+}
+
 func (d *FSPluginDecoder) Signature() (string, error) {
 	return "", nil
 }

+ 16 - 0
internal/core/plugin_packager/decoder/zip.go

@@ -3,6 +3,8 @@ package decoder
 import (
 	"archive/zip"
 	"bytes"
+	"io"
+	"io/fs"
 	"path"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -34,6 +36,16 @@ func NewZipPluginDecoder(binary []byte) (*ZipPluginDecoder, error) {
 	return decoder, nil
 }
 
+func (z *ZipPluginDecoder) Stat(filename string) (fs.FileInfo, error) {
+	f, err := z.reader.Open(filename)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	return f.Stat()
+}
+
 func (z *ZipPluginDecoder) Open() error {
 	if z.reader == nil {
 		return z.err
@@ -82,6 +94,10 @@ func (z *ZipPluginDecoder) ReadFile(filename string) ([]byte, error) {
 	return data.Bytes(), nil
 }
 
+func (z *ZipPluginDecoder) FileReader(filename string) (io.ReadCloser, error) {
+	return z.reader.Open(filename)
+}
+
 func (z *ZipPluginDecoder) decode() error {
 	if z.reader == nil {
 		return z.err

+ 1 - 1
internal/core/plugin_packager/packager/packager.go

@@ -28,7 +28,7 @@ func (p *Packager) Pack() ([]byte, error) {
 	zip_buffer := new(bytes.Buffer)
 	zip_writer := zip.NewWriter(zip_buffer)
 
-	p.decoder.Walk(func(filename, dir string) error {
+	err = p.decoder.Walk(func(filename, dir string) error {
 		file, err := p.decoder.ReadFile(filename)
 		if err != nil {
 			return err

+ 1 - 0
internal/core/plugin_packager/packager_test.go

@@ -16,6 +16,7 @@ var manifest []byte
 
 func TestPackagerAndVerifier(t *testing.T) {
 	// create a temp directory
+	os.RemoveAll("temp")
 	if err := os.Mkdir("temp", 0755); err != nil {
 		t.Errorf("failed to create temp directory: %s", err.Error())
 		return

+ 13 - 0
internal/utils/tmpfile/tmpfile.go

@@ -0,0 +1,13 @@
+package tmpfile
+
+import "os"
+
+// CreateTempFile creates a temp file with the given prefix
+// and returns the path to the temp file and a function to clean up the temp file
+func CreateTempFile(prefix string) (*os.File, func(), error) {
+	file, err := os.CreateTemp(os.TempDir(), prefix)
+	if err != nil {
+		return nil, nil, err
+	}
+	return file, func() { os.Remove(file.Name()) }, nil
+}