123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package serverless
- import (
- "errors"
- "fmt"
- "io"
- "net/url"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
- )
- var ()
- type LambdaFunction struct {
- FunctionName string `json:"function_name" validate:"required"`
- FunctionARN string `json:"function_arn" validate:"required"`
- FunctionURL string `json:"function_url" validate:"required"`
- }
- // Ping the serverless connector, return error if failed
- func Ping() error {
- url, err := url.JoinPath(baseurl.String(), "/ping")
- if err != nil {
- return err
- }
- response, err := http_requests.PostAndParse[entities.GenericResponse[string]](
- client,
- url,
- http_requests.HttpHeader(map[string]string{
- "Authorization": SERVERLESS_CONNECTOR_API_KEY,
- }),
- )
- if err != nil {
- return err
- }
- if response.Code != 0 {
- return fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
- }
- if response.Data != "pong" {
- return fmt.Errorf("unexpected response from serverless connector: %s", response.Data)
- }
- return nil
- }
- var (
- ErrNoLambdaFunction = errors.New("no lambda function found")
- )
- // Fetch the lambda function from serverless connector, return error if failed
- func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
- request := map[string]any{
- "plugin": map[string]any{
- "config": map[string]any{
- "identity": identity,
- "checksum": checksum,
- },
- },
- }
- url, err := url.JoinPath(baseurl.String(), "/v1/lambda/fetch")
- if err != nil {
- return nil, err
- }
- response, err := http_requests.PostAndParse[entities.GenericResponse[LambdaFunction]](
- client,
- url,
- http_requests.HttpHeader(map[string]string{
- "Authorization": SERVERLESS_CONNECTOR_API_KEY,
- }),
- http_requests.HttpPayloadJson(request),
- )
- if err != nil {
- return nil, err
- }
- if response.Code != 0 {
- if response.Code == -404 {
- return nil, ErrNoLambdaFunction
- }
- return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
- }
- return &response.Data, nil
- }
- type LaunchAWSLambdaFunctionEvent string
- const (
- Error LaunchAWSLambdaFunctionEvent = "error"
- Info LaunchAWSLambdaFunctionEvent = "info"
- Lambda LaunchAWSLambdaFunctionEvent = "lambda"
- LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
- Done LaunchAWSLambdaFunctionEvent = "done"
- )
- type LaunchAWSLambdaFunctionResponse struct {
- Event LaunchAWSLambdaFunctionEvent `json:"event"`
- Message string `json:"message"`
- }
- // Launch the lambda function from serverless connector, it will receive the context_tar as the input
- // and build it a docker image, then run it on serverless platform like AWS Lambda
- // it returns a event stream, the caller should consider it as a async operation
- func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
- url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
- if err != nil {
- return nil, err
- }
- response, err := http_requests.PostAndParseStream[LaunchAWSLambdaFunctionResponse](
- client,
- url,
- http_requests.HttpHeader(map[string]string{
- "Authorization": SERVERLESS_CONNECTOR_API_KEY,
- }),
- http_requests.HttpReadTimeout(240000),
- http_requests.HttpWriteTimeout(240000),
- http_requests.HttpPayloadMultipart(
- map[string]string{
- "identity": identity,
- "checksum": checksum,
- },
- map[string]io.Reader{
- "context": context_tar,
- },
- ),
- http_requests.HttpRaiseErrorWhenStreamDataNotMatch(true),
- )
- if err != nil {
- return nil, err
- }
- return response, nil
- }
|