| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 | 
							- package service
 
- import (
 
- 	"bytes"
 
- 	"context"
 
- 	"encoding/hex"
 
- 	"errors"
 
- 	"fmt"
 
- 	"io"
 
- 	"sync/atomic"
 
- 	"time"
 
- 	"github.com/gin-gonic/gin"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/db"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 
- )
 
- func Endpoint(
 
- 	ctx *gin.Context,
 
- 	endpoint *models.Endpoint,
 
- 	pluginInstallation *models.PluginInstallation,
 
- 	path string,
 
- ) {
 
- 	if !endpoint.Enabled {
 
- 		ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
 
- 		return
 
- 	}
 
- 	req := ctx.Request.Clone(context.Background())
 
- 	req.URL.Path = path
 
- 	// read request body until complete, max 10MB
 
- 	body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
 
- 	if err != nil {
 
- 		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
 
- 		return
 
- 	}
 
- 	// replace with a new reader
 
- 	req.Body = io.NopCloser(bytes.NewReader(body))
 
- 	req.ContentLength = int64(len(body))
 
- 	req.TransferEncoding = nil
 
- 	// remove ip traces for security
 
- 	req.Header.Del("X-Forwarded-For")
 
- 	req.Header.Del("X-Real-IP")
 
- 	req.Header.Del("X-Forwarded")
 
- 	req.Header.Del("X-Original-Forwarded-For")
 
- 	req.Header.Del("X-Original-Url")
 
- 	req.Header.Del("X-Original-Host")
 
- 	var buffer bytes.Buffer
 
- 	err = req.Write(&buffer)
 
- 	if err != nil {
 
- 		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
 
- 		return
 
- 	}
 
- 	identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
 
- 	if err != nil {
 
- 		ctx.JSON(400, exception.PluginUniqueIdentifierError(err).ToResponse())
 
- 		return
 
- 	}
 
- 	// fetch plugin
 
- 	manager := plugin_manager.Manager()
 
- 	runtime, err := manager.Get(identifier)
 
- 	if err != nil {
 
- 		ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
 
- 		return
 
- 	}
 
- 	// fetch endpoint declaration
 
- 	endpointDeclaration := runtime.Configuration().Endpoint
 
- 	if endpointDeclaration == nil {
 
- 		ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
 
- 		return
 
- 	}
 
- 	// decrypt settings
 
- 	settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
 
- 		BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
 
- 			TenantId: endpoint.TenantID,
 
- 			UserId:   "",
 
- 			Type:     dify_invocation.INVOKE_TYPE_ENCRYPT,
 
- 		},
 
- 		InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
 
- 			Opt:       dify_invocation.ENCRYPT_OPT_DECRYPT,
 
- 			Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
 
- 			Identity:  endpoint.ID,
 
- 			Data:      endpoint.Settings,
 
- 			Config:    endpointDeclaration.Settings,
 
- 		},
 
- 	})
 
- 	if err != nil {
 
- 		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
 
- 		return
 
- 	}
 
- 	session := session_manager.NewSession(
 
- 		session_manager.NewSessionPayload{
 
- 			TenantID:               endpoint.TenantID,
 
- 			UserID:                 "",
 
- 			PluginUniqueIdentifier: identifier,
 
- 			ClusterID:              ctx.GetString("cluster_id"),
 
- 			InvokeFrom:             access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
 
- 			Action:                 access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
 
- 			Declaration:            runtime.Configuration(),
 
- 			BackwardsInvocation:    manager.BackwardsInvocation(),
 
- 			IgnoreCache:            false,
 
- 			EndpointID:             &endpoint.ID,
 
- 		},
 
- 	)
 
- 	defer session.Close(session_manager.CloseSessionPayload{
 
- 		IgnoreCache: false,
 
- 	})
 
- 	session.BindRuntime(runtime)
 
- 	statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
 
- 		session, &requests.RequestInvokeEndpoint{
 
- 			RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
 
- 			Settings:       settings,
 
- 		},
 
- 	)
 
- 	if err != nil {
 
- 		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
 
- 		return
 
- 	}
 
- 	defer response.Close()
 
- 	done := make(chan bool)
 
- 	closed := new(int32)
 
- 	ctx.Status(statusCode)
 
- 	for k, v := range *headers {
 
- 		if len(v) > 0 {
 
- 			ctx.Writer.Header().Set(k, v[0])
 
- 		}
 
- 	}
 
- 	close := func() {
 
- 		if atomic.CompareAndSwapInt32(closed, 0, 1) {
 
- 			close(done)
 
- 		}
 
- 	}
 
- 	defer close()
 
- 	routine.Submit(map[string]string{
 
- 		"module":   "service",
 
- 		"function": "Endpoint",
 
- 	}, func() {
 
- 		defer close()
 
- 		for response.Next() {
 
- 			chunk, err := response.Read()
 
- 			if err != nil {
 
- 				ctx.Writer.Write([]byte(err.Error()))
 
- 				ctx.Writer.Flush()
 
- 				return
 
- 			}
 
- 			ctx.Writer.Write(chunk)
 
- 			ctx.Writer.Flush()
 
- 		}
 
- 	})
 
- 	select {
 
- 	case <-ctx.Writer.CloseNotify():
 
- 	case <-done:
 
- 	case <-time.After(240 * time.Second):
 
- 		ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
 
- 	}
 
- }
 
- func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
 
- 	endpoint, err := db.GetOne[models.Endpoint](
 
- 		db.Equal("id", endpoint_id),
 
- 		db.Equal("tenant_id", tenant_id),
 
- 	)
 
- 	if err != nil {
 
- 		return exception.NotFoundError(errors.New("endpoint not found")).ToResponse()
 
- 	}
 
- 	endpoint.Enabled = true
 
- 	if err := install_service.EnabledEndpoint(&endpoint); err != nil {
 
- 		return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
 
- 	}
 
- 	return entities.NewSuccessResponse(true)
 
- }
 
- func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
 
- 	endpoint, err := db.GetOne[models.Endpoint](
 
- 		db.Equal("id", endpoint_id),
 
- 		db.Equal("tenant_id", tenant_id),
 
- 	)
 
- 	if err != nil {
 
- 		return exception.NotFoundError(errors.New("Endpoint not found")).ToResponse()
 
- 	}
 
- 	endpoint.Enabled = false
 
- 	if err := install_service.DisabledEndpoint(&endpoint); err != nil {
 
- 		return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
 
- 	}
 
- 	return entities.NewSuccessResponse(true)
 
- }
 
- func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
 
- 	endpoints, err := db.GetAll[models.Endpoint](
 
- 		db.Equal("tenant_id", tenant_id),
 
- 		db.OrderBy("created_at", true),
 
- 		db.Page(page, page_size),
 
- 	)
 
- 	if err != nil {
 
- 		return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
 
- 	}
 
- 	manager := plugin_manager.Manager()
 
- 	if manager == nil {
 
- 		return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
 
- 	}
 
- 	// decrypt settings
 
- 	for i, endpoint := range endpoints {
 
- 		pluginInstallation, err := db.GetOne[models.PluginInstallation](
 
- 			db.Equal("plugin_id", endpoint.PluginID),
 
- 			db.Equal("tenant_id", tenant_id),
 
- 		)
 
- 		if err != nil {
 
- 			// use empty settings and declaration for uninstalled plugins
 
- 			endpoint.Settings = map[string]any{}
 
- 			endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
 
- 				Settings:      []plugin_entities.ProviderConfig{},
 
- 				Endpoints:     []plugin_entities.EndpointDeclaration{},
 
- 				EndpointFiles: []string{},
 
- 			}
 
- 			endpoints[i] = endpoint
 
- 			continue
 
- 		}
 
- 		pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
 
- 			pluginInstallation.PluginUniqueIdentifier,
 
- 		)
 
- 		if err != nil {
 
- 			return exception.PluginUniqueIdentifierError(
 
- 				fmt.Errorf("failed to parse plugin unique identifier: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		pluginDeclaration, err := manager.GetDeclaration(
 
- 			pluginUniqueIdentifier,
 
- 			tenant_id,
 
- 			plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
 
- 		)
 
- 		if err != nil {
 
- 			return exception.InternalServerError(
 
- 				fmt.Errorf("failed to get plugin declaration: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		if pluginDeclaration.Endpoint == nil {
 
- 			return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
 
- 		}
 
- 		decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
 
- 			BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
 
- 				TenantId: tenant_id,
 
- 				UserId:   "",
 
- 				Type:     dify_invocation.INVOKE_TYPE_ENCRYPT,
 
- 			},
 
- 			InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
 
- 				Opt:       dify_invocation.ENCRYPT_OPT_DECRYPT,
 
- 				Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
 
- 				Identity:  endpoint.ID,
 
- 				Data:      endpoint.Settings,
 
- 				Config:    pluginDeclaration.Endpoint.Settings,
 
- 			},
 
- 		})
 
- 		if err != nil {
 
- 			return exception.InternalServerError(
 
- 				fmt.Errorf("failed to decrypt settings: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		// mask settings
 
- 		decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
 
- 		endpoint.Settings = decryptedSettings
 
- 		endpoint.Declaration = pluginDeclaration.Endpoint
 
- 		endpoints[i] = endpoint
 
- 	}
 
- 	return entities.NewSuccessResponse(endpoints)
 
- }
 
- func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
 
- 	endpoints, err := db.GetAll[models.Endpoint](
 
- 		db.Equal("plugin_id", plugin_id),
 
- 		db.Equal("tenant_id", tenant_id),
 
- 		db.OrderBy("created_at", true),
 
- 		db.Page(page, page_size),
 
- 	)
 
- 	if err != nil {
 
- 		return exception.InternalServerError(
 
- 			fmt.Errorf("failed to list endpoints: %v", err),
 
- 		).ToResponse()
 
- 	}
 
- 	manager := plugin_manager.Manager()
 
- 	if manager == nil {
 
- 		return exception.InternalServerError(
 
- 			errors.New("failed to get plugin manager"),
 
- 		).ToResponse()
 
- 	}
 
- 	// decrypt settings
 
- 	for i, endpoint := range endpoints {
 
- 		// get installation
 
- 		pluginInstallation, err := db.GetOne[models.PluginInstallation](
 
- 			db.Equal("plugin_id", plugin_id),
 
- 			db.Equal("tenant_id", tenant_id),
 
- 		)
 
- 		if err != nil {
 
- 			return exception.NotFoundError(
 
- 				fmt.Errorf("failed to find plugin installation: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
 
- 			pluginInstallation.PluginUniqueIdentifier,
 
- 		)
 
- 		if err != nil {
 
- 			return exception.PluginUniqueIdentifierError(
 
- 				fmt.Errorf("failed to parse plugin unique identifier: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		pluginDeclaration, err := manager.GetDeclaration(
 
- 			pluginUniqueIdentifier,
 
- 			tenant_id,
 
- 			plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
 
- 		)
 
- 		if err != nil {
 
- 			return exception.InternalServerError(
 
- 				fmt.Errorf("failed to get plugin declaration: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
 
- 			BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
 
- 				TenantId: tenant_id,
 
- 				UserId:   "",
 
- 				Type:     dify_invocation.INVOKE_TYPE_ENCRYPT,
 
- 			},
 
- 			InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
 
- 				Opt:       dify_invocation.ENCRYPT_OPT_DECRYPT,
 
- 				Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
 
- 				Identity:  endpoint.ID,
 
- 				Data:      endpoint.Settings,
 
- 				Config:    pluginDeclaration.Endpoint.Settings,
 
- 			},
 
- 		})
 
- 		if err != nil {
 
- 			return exception.InternalServerError(
 
- 				fmt.Errorf("failed to decrypt settings: %v", err),
 
- 			).ToResponse()
 
- 		}
 
- 		// mask settings
 
- 		decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
 
- 		endpoint.Settings = decryptedSettings
 
- 		endpoint.Declaration = pluginDeclaration.Endpoint
 
- 		endpoints[i] = endpoint
 
- 	}
 
- 	return entities.NewSuccessResponse(endpoints)
 
- }
 
 
  |