瀏覽代碼

feat: support aws installer

Yeuoly 10 月之前
父節點
當前提交
b397e73e95

+ 5 - 5
internal/core/dify_invocation/http_request.go

@@ -19,7 +19,7 @@ func Request[T any](method string, path string, options ...http_requests.HttpOpt
 	return http_requests.RequestAndParse[T](client, difyPath(path), method, options...)
 }
 
-func StreamResponse[T any](method string, path string, options ...http_requests.HttpOptions) (*stream.StreamResponse[T], error) {
+func StreamResponse[T any](method string, path string, options ...http_requests.HttpOptions) (*stream.Stream[T], error) {
 	options = append(
 		options, http_requests.HttpHeader(map[string]string{
 			"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
@@ -31,7 +31,7 @@ func StreamResponse[T any](method string, path string, options ...http_requests.
 	return http_requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
 }
 
-func InvokeLLM(payload *InvokeLLMRequest) (*stream.StreamResponse[model_entities.LLMResultChunk], error) {
+func InvokeLLM(payload *InvokeLLMRequest) (*stream.Stream[model_entities.LLMResultChunk], error) {
 	return StreamResponse[model_entities.LLMResultChunk]("POST", "invoke/llm", http_requests.HttpPayloadJson(payload))
 }
 
@@ -43,7 +43,7 @@ func InvokeRerank(payload *InvokeRerankRequest) (*model_entities.RerankResult, e
 	return Request[model_entities.RerankResult]("POST", "invoke/rerank", http_requests.HttpPayloadJson(payload))
 }
 
-func InvokeTTS(payload *InvokeTTSRequest) (*stream.StreamResponse[model_entities.TTSResult], error) {
+func InvokeTTS(payload *InvokeTTSRequest) (*stream.Stream[model_entities.TTSResult], error) {
 	return StreamResponse[model_entities.TTSResult]("POST", "invoke/tts", http_requests.HttpPayloadJson(payload))
 }
 
@@ -55,11 +55,11 @@ func InvokeModeration(payload *InvokeModerationRequest) (*model_entities.Moderat
 	return Request[model_entities.ModerationResult]("POST", "invoke/moderation", http_requests.HttpPayloadJson(payload))
 }
 
-func InvokeTool(payload *InvokeToolRequest) (*stream.StreamResponse[tool_entities.ToolResponseChunk], error) {
+func InvokeTool(payload *InvokeToolRequest) (*stream.Stream[tool_entities.ToolResponseChunk], error) {
 	return StreamResponse[tool_entities.ToolResponseChunk]("POST", "invoke/tool", http_requests.HttpPayloadJson(payload))
 }
 
-func InvokeApp(payload *InvokeAppRequest) (*stream.StreamResponse[map[string]any], error) {
+func InvokeApp(payload *InvokeAppRequest) (*stream.Stream[map[string]any], error) {
 	return StreamResponse[map[string]any]("POST", "invoke/app", http_requests.HttpPayloadJson(payload))
 }
 

+ 1 - 1
internal/core/plugin_daemon/endpoint_service.go

@@ -15,7 +15,7 @@ func InvokeEndpoint(
 	session *session_manager.Session,
 	request *requests.RequestInvokeEndpoint,
 ) (
-	int, *http.Header, *stream.StreamResponse[[]byte], error,
+	int, *http.Header, *stream.Stream[[]byte], error,
 ) {
 	resp, err := genericInvokePlugin[requests.RequestInvokeEndpoint, endpoint_entities.EndpointResponseChunk](
 		session,

+ 1 - 1
internal/core/plugin_daemon/generic.go

@@ -17,7 +17,7 @@ func genericInvokePlugin[Req any, Rsp any](
 	session *session_manager.Session,
 	request *Req,
 	response_buffer_size int,
-) (*stream.StreamResponse[Rsp], error) {
+) (*stream.Stream[Rsp], error) {
 	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginUniqueIdentifier)
 	if runtime == nil {
 		return nil, errors.New("plugin not found")

+ 8 - 8
internal/core/plugin_daemon/model_service.go

@@ -11,7 +11,7 @@ func InvokeLLM(
 	session *session_manager.Session,
 	request *requests.RequestInvokeLLM,
 ) (
-	*stream.StreamResponse[model_entities.LLMResultChunk], error,
+	*stream.Stream[model_entities.LLMResultChunk], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeLLM, model_entities.LLMResultChunk](
 		session,
@@ -24,7 +24,7 @@ func InvokeTextEmbedding(
 	session *session_manager.Session,
 	request *requests.RequestInvokeTextEmbedding,
 ) (
-	*stream.StreamResponse[model_entities.TextEmbeddingResult], error,
+	*stream.Stream[model_entities.TextEmbeddingResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeTextEmbedding, model_entities.TextEmbeddingResult](
 		session,
@@ -37,7 +37,7 @@ func InvokeRerank(
 	session *session_manager.Session,
 	request *requests.RequestInvokeRerank,
 ) (
-	*stream.StreamResponse[model_entities.RerankResult], error,
+	*stream.Stream[model_entities.RerankResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeRerank, model_entities.RerankResult](
 		session,
@@ -50,7 +50,7 @@ func InvokeTTS(
 	session *session_manager.Session,
 	request *requests.RequestInvokeTTS,
 ) (
-	*stream.StreamResponse[model_entities.TTSResult], error,
+	*stream.Stream[model_entities.TTSResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeTTS, model_entities.TTSResult](
 		session,
@@ -63,7 +63,7 @@ func InvokeSpeech2Text(
 	session *session_manager.Session,
 	request *requests.RequestInvokeSpeech2Text,
 ) (
-	*stream.StreamResponse[model_entities.Speech2TextResult], error,
+	*stream.Stream[model_entities.Speech2TextResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeSpeech2Text, model_entities.Speech2TextResult](
 		session,
@@ -76,7 +76,7 @@ func InvokeModeration(
 	session *session_manager.Session,
 	request *requests.RequestInvokeModeration,
 ) (
-	*stream.StreamResponse[model_entities.ModerationResult], error,
+	*stream.Stream[model_entities.ModerationResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeModeration, model_entities.ModerationResult](
 		session,
@@ -89,7 +89,7 @@ func ValidateProviderCredentials(
 	session *session_manager.Session,
 	request *requests.RequestValidateProviderCredentials,
 ) (
-	*stream.StreamResponse[model_entities.ValidateCredentialsResult], error,
+	*stream.Stream[model_entities.ValidateCredentialsResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestValidateProviderCredentials, model_entities.ValidateCredentialsResult](
 		session,
@@ -102,7 +102,7 @@ func ValidateModelCredentials(
 	session *session_manager.Session,
 	request *requests.RequestValidateModelCredentials,
 ) (
-	*stream.StreamResponse[model_entities.ValidateCredentialsResult], error,
+	*stream.Stream[model_entities.ValidateCredentialsResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestValidateModelCredentials, model_entities.ValidateCredentialsResult](
 		session,

+ 2 - 2
internal/core/plugin_daemon/tool_service.go

@@ -11,7 +11,7 @@ func InvokeTool(
 	session *session_manager.Session,
 	request *requests.RequestInvokeTool,
 ) (
-	*stream.StreamResponse[tool_entities.ToolResponseChunk], error,
+	*stream.Stream[tool_entities.ToolResponseChunk], error,
 ) {
 	return genericInvokePlugin[requests.RequestInvokeTool, tool_entities.ToolResponseChunk](
 		session,
@@ -24,7 +24,7 @@ func ValidateToolCredentials(
 	session *session_manager.Session,
 	request *requests.RequestValidateToolCredentials,
 ) (
-	*stream.StreamResponse[tool_entities.ValidateCredentialsResult], error,
+	*stream.Stream[tool_entities.ValidateCredentialsResult], error,
 ) {
 	return genericInvokePlugin[requests.RequestValidateToolCredentials, tool_entities.ValidateCredentialsResult](
 		session,

+ 0 - 7
internal/core/plugin_manager/installer.go

@@ -1,7 +0,0 @@
-package plugin_manager
-
-import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-
-type Installer interface {
-	Install(decoder decoder.PluginDecoder) error
-}

+ 10 - 0
internal/core/plugin_manager/installer/aws.go

@@ -0,0 +1,10 @@
+package installer
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
+	return nil, nil
+}

+ 8 - 0
internal/core/plugin_manager/installer/installer.go

@@ -0,0 +1,8 @@
+package installer
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[string], error)

+ 10 - 0
internal/core/plugin_manager/installer/local.go

@@ -0,0 +1,10 @@
+package installer
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
+	return nil, nil
+}

+ 10 - 0
internal/core/plugin_manager/manager.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -30,6 +31,8 @@ type PluginManager struct {
 	// start process lock
 	startProcessLock *lock.HighGranularityLock
 	// serverless runtime
+
+	installer installer.Installer
 }
 
 var (
@@ -47,6 +50,13 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config
 		),
 		startProcessLock: lock.NewHighGranularityLock(),
 	}
+
+	if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
+		manager.installer = installer.AwsInstaller
+	} else if configuration.Platform == app.PLATFORM_LOCAL {
+		manager.installer = installer.LocalInstaller
+	}
+
 	manager.Init(configuration)
 }
 

+ 1 - 1
internal/core/plugin_manager/remote_manager/hooks.go

@@ -38,7 +38,7 @@ type DifyServer struct {
 	num_loops int
 
 	// read new connections
-	response *stream.StreamResponse[*RemotePluginRuntime]
+	response *stream.Stream[*RemotePluginRuntime]
 
 	plugins      map[int]*RemotePluginRuntime
 	plugins_lock *sync.RWMutex

+ 1 - 1
internal/core/plugin_manager/remote_manager/type.go

@@ -23,7 +23,7 @@ type RemotePluginRuntime struct {
 	closed int32
 
 	// response entity to accept new events
-	response *stream.StreamResponse[[]byte]
+	response *stream.Stream[[]byte]
 
 	// callbacks for each session
 	callbacks      map[string][]func([]byte)

+ 1 - 1
internal/core/plugin_manager/serverless/connector.go

@@ -104,7 +104,7 @@ type LaunchAWSLambdaFunctionResponse struct {
 // Launch the lambda function from serverless connector, it will receive the context_tar as the input
 // and build it a docker image, then run it on serverless platform like AWS Lambda
 // it returns a event stream, the caller should consider it as a async operation
-func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.StreamResponse[LaunchAWSLambdaFunctionResponse], error) {
+func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
 	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
 	if err != nil {
 		return nil, err

+ 2 - 2
internal/service/invoke_model.go

@@ -29,7 +29,7 @@ func InvokeTool(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[tool_entities.ToolResponseChunk], error) {
+		func() (*stream.Stream[tool_entities.ToolResponseChunk], error) {
 			return plugin_daemon.InvokeTool(session, &r.Data)
 		},
 		ctx,
@@ -56,7 +56,7 @@ func ValidateToolCredentials(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[tool_entities.ValidateCredentialsResult], error) {
+		func() (*stream.Stream[tool_entities.ValidateCredentialsResult], error) {
 			return plugin_daemon.ValidateToolCredentials(session, &r.Data)
 		},
 		ctx,

+ 8 - 8
internal/service/invoke_tool.go

@@ -53,7 +53,7 @@ func InvokeLLM(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.LLMResultChunk], error) {
+		func() (*stream.Stream[model_entities.LLMResultChunk], error) {
 			return plugin_daemon.InvokeLLM(session, &r.Data)
 		},
 		ctx,
@@ -79,7 +79,7 @@ func InvokeTextEmbedding(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.TextEmbeddingResult], error) {
+		func() (*stream.Stream[model_entities.TextEmbeddingResult], error) {
 			return plugin_daemon.InvokeTextEmbedding(session, &r.Data)
 		},
 		ctx,
@@ -106,7 +106,7 @@ func InvokeRerank(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.RerankResult], error) {
+		func() (*stream.Stream[model_entities.RerankResult], error) {
 			return plugin_daemon.InvokeRerank(session, &r.Data)
 		},
 		ctx,
@@ -133,7 +133,7 @@ func InvokeTTS(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.TTSResult], error) {
+		func() (*stream.Stream[model_entities.TTSResult], error) {
 			return plugin_daemon.InvokeTTS(session, &r.Data)
 		},
 		ctx,
@@ -160,7 +160,7 @@ func InvokeSpeech2Text(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.Speech2TextResult], error) {
+		func() (*stream.Stream[model_entities.Speech2TextResult], error) {
 			return plugin_daemon.InvokeSpeech2Text(session, &r.Data)
 		},
 		ctx,
@@ -187,7 +187,7 @@ func InvokeModeration(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.ModerationResult], error) {
+		func() (*stream.Stream[model_entities.ModerationResult], error) {
 			return plugin_daemon.InvokeModeration(session, &r.Data)
 		},
 		ctx,
@@ -214,7 +214,7 @@ func ValidateProviderCredentials(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
+		func() (*stream.Stream[model_entities.ValidateCredentialsResult], error) {
 			return plugin_daemon.ValidateProviderCredentials(session, &r.Data)
 		},
 		ctx,
@@ -241,7 +241,7 @@ func ValidateModelCredentials(
 	defer session.Close()
 
 	baseSSEService(
-		func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
+		func() (*stream.Stream[model_entities.ValidateCredentialsResult], error) {
 			return plugin_daemon.ValidateModelCredentials(session, &r.Data)
 		},
 		ctx,

+ 1 - 1
internal/service/runner.go

@@ -14,7 +14,7 @@ import (
 // baseSSEService is a helper function to handle SSE service
 // it accepts a generator function that returns a stream response to gin context
 func baseSSEService[R any](
-	generator func() (*stream.StreamResponse[R], error),
+	generator func() (*stream.Stream[R], error),
 	ctx *gin.Context,
 	max_timeout_seconds int,
 ) {

+ 4 - 4
internal/utils/http_requests/http_warpper.go

@@ -74,7 +74,7 @@ func PatchAndParse[T any](client *http.Client, url string, options ...HttpOption
 	return RequestAndParse[T](client, url, "PATCH", options...)
 }
 
-func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
+func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.Stream[T], error) {
 	resp, err := Request(client, url, method, options...)
 	if err != nil {
 		return nil, err
@@ -131,14 +131,14 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 	return ch, nil
 }
 
-func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
+func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {
 	return RequestAndParseStream[T](client, url, "GET", options...)
 }
 
-func PostAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
+func PostAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {
 	return RequestAndParseStream[T](client, url, "POST", options...)
 }
 
-func PutAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
+func PutAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {
 	return RequestAndParseStream[T](client, url, "PUT", options...)
 }

+ 12 - 12
internal/utils/stream/response.go

@@ -8,7 +8,7 @@ import (
 	"github.com/gammazero/deque"
 )
 
-type StreamResponse[T any] struct {
+type Stream[T any] struct {
 	q         deque.Deque[T]
 	l         *sync.Mutex
 	sig       chan bool
@@ -19,15 +19,15 @@ type StreamResponse[T any] struct {
 	err       error
 }
 
-func NewStreamResponse[T any](max int) *StreamResponse[T] {
-	return &StreamResponse[T]{
+func NewStreamResponse[T any](max int) *Stream[T] {
+	return &Stream[T]{
 		l:   &sync.Mutex{},
 		sig: make(chan bool),
 		max: max,
 	}
 }
 
-func (r *StreamResponse[T]) OnClose(f func()) {
+func (r *Stream[T]) OnClose(f func()) {
 	r.onClose = f
 }
 
@@ -35,7 +35,7 @@ func (r *StreamResponse[T]) OnClose(f func()) {
 // and waits for the next data to be available
 // returns false if the stream is closed
 // NOTE: even if the stream is closed, it will return true if there is data available
-func (r *StreamResponse[T]) Next() bool {
+func (r *Stream[T]) Next() bool {
 	r.l.Lock()
 	if r.closed == 1 && r.q.Len() == 0 && r.err == nil {
 		r.l.Unlock()
@@ -58,7 +58,7 @@ func (r *StreamResponse[T]) Next() bool {
 
 // Read reads buffered data from the stream and
 // it returns error only if the buffer is empty or an error is written to the stream
-func (r *StreamResponse[T]) Read() (T, error) {
+func (r *Stream[T]) Read() (T, error) {
 	r.l.Lock()
 	defer r.l.Unlock()
 
@@ -78,7 +78,7 @@ func (r *StreamResponse[T]) Read() (T, error) {
 }
 
 // Wrap wraps the stream with a new stream, and allows customized operations
-func (r *StreamResponse[T]) Wrap(fn func(T)) error {
+func (r *Stream[T]) Wrap(fn func(T)) error {
 	if atomic.LoadInt32(&r.closed) == 1 {
 		return errors.New("stream is closed")
 	}
@@ -96,7 +96,7 @@ func (r *StreamResponse[T]) Wrap(fn func(T)) error {
 
 // Write writes data to the stream
 // returns error if the buffer is full
-func (r *StreamResponse[T]) Write(data T) error {
+func (r *Stream[T]) Write(data T) error {
 	if atomic.LoadInt32(&r.closed) == 1 {
 		return nil
 	}
@@ -119,7 +119,7 @@ func (r *StreamResponse[T]) Write(data T) error {
 }
 
 // Close closes the stream
-func (r *StreamResponse[T]) Close() {
+func (r *Stream[T]) Close() {
 	if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
 		return
 	}
@@ -134,11 +134,11 @@ func (r *StreamResponse[T]) Close() {
 	}
 }
 
-func (r *StreamResponse[T]) IsClosed() bool {
+func (r *Stream[T]) IsClosed() bool {
 	return atomic.LoadInt32(&r.closed) == 1
 }
 
-func (r *StreamResponse[T]) Size() int {
+func (r *Stream[T]) Size() int {
 	r.l.Lock()
 	defer r.l.Unlock()
 
@@ -146,7 +146,7 @@ func (r *StreamResponse[T]) Size() int {
 }
 
 // WriteError writes an error to the stream
-func (r *StreamResponse[T]) WriteError(err error) {
+func (r *Stream[T]) WriteError(err error) {
 	r.l.Lock()
 	defer r.l.Unlock()