Forráskód Böngészése

feat: add aws lambda io layer

Yeuoly 11 hónapja%!(EXTRA string=óta)
szülő
commit
464e0d9e33

+ 21 - 0
internal/core/plugin_manager/aws_manager/environment.go

@@ -2,6 +2,8 @@ package aws_manager
 
 import (
 	"fmt"
+	"net"
+	"net/http"
 	"os"
 	"time"
 
@@ -13,6 +15,25 @@ var (
 )
 
 func (r *AWSPluginRuntime) InitEnvironment() error {
+	if err := r.initEnvironment(); err != nil {
+		return err
+	}
+
+	// init http client
+	r.client = &http.Client{
+		Transport: &http.Transport{
+			Dial: (&net.Dialer{
+				Timeout:   5 * time.Second,
+				KeepAlive: 15 * time.Second,
+			}).Dial,
+			IdleConnTimeout: 120 * time.Second,
+		},
+	}
+
+	return nil
+}
+
+func (r *AWSPluginRuntime) initEnvironment() error {
 	r.Log("Starting to initialize environment")
 	// check if the plugin has already been initialized, at most 300s
 	if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {

+ 92 - 1
internal/core/plugin_manager/aws_manager/io.go

@@ -1,12 +1,103 @@
 package aws_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+import (
+	"context"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+// consume data from data stream
+func (r *AWSPluginRuntime) consume() {
+	for {
+		select {
+		case data := <-r.data_stream:
+			fmt.Println(data)
+		}
+	}
+}
 
 func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
 	l := entities.NewIOListener[[]byte]()
+	l.OnClose(func() {
+		// close the pipe writer
+		writer, exists := r.session_pool.Load(session_id)
+		if exists {
+			writer.Close()
+		}
+	})
 	return l
 }
 
 func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
+	// check if session exists
+	var pw *io.PipeWriter
+	var exists bool
+
+	if pw, exists = r.session_pool.Load(session_id); !exists {
+		url, err := url.JoinPath(r.lambda_url, "invoke")
+		if err != nil {
+			r.Error(fmt.Sprintf("Error creating request: %v", err))
+			return
+		}
+
+		// create a new http request here
+		npr, npw := io.Pipe()
+		r.session_pool.Store(session_id, npw)
+		pw = npw
+
+		// create a new http request
+		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+		defer cancel()
+		req, err := http.NewRequestWithContext(ctx, "POST", url, npr)
+		if err != nil {
+			r.Error(fmt.Sprintf("Error creating request: %v", err))
+			return
+		}
+
+		req.Header.Set("Content-Type", "application/octet-stream")
+		req.Header.Set("Accept", "application/octet-stream")
+
+		routine.Submit(func() {
+			response, err := http.DefaultClient.Do(req)
+			if err != nil {
+				r.Error(fmt.Sprintf("Error sending request to aws lambda: %v", err))
+				return
+			}
+
+			// write to data stream
+			for {
+				buf := make([]byte, 1024)
+				n, err := response.Body.Read(buf)
+				if err != nil {
+					if err == io.EOF {
+						break
+					} else {
+						r.Error(fmt.Sprintf("Error reading response from aws lambda: %v", err))
+						break
+					}
+				}
+				// write to data stream
+				select {
+				case r.data_stream <- buf[:n]:
+				default:
+					r.Error("Data stream is full")
+				}
+			}
+
+			// remove the session from the pool
+			r.session_pool.Delete(session_id)
+		})
+	}
 
+	if pw != nil {
+		if _, err := pw.Write(data); err != nil {
+			r.Error(fmt.Sprintf("Error writing to pipe writer: %v", err))
+		}
+	}
 }

+ 11 - 0
internal/core/plugin_manager/aws_manager/type.go

@@ -1,8 +1,12 @@
 package aws_manager
 
 import (
+	"io"
+	"net/http"
+
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
 )
 
 type AWSPluginRuntime struct {
@@ -12,4 +16,11 @@ type AWSPluginRuntime struct {
 	// access url for the lambda function
 	lambda_url  string
 	lambda_name string
+
+	client *http.Client
+
+	session_pool mapping.Map[string, *io.PipeWriter]
+
+	// data stream take responsibility of listen to response from lambda and redirect to runtime listener
+	data_stream chan []byte
 }