123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- package service
- import (
- "bytes"
- "context"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "sync/atomic"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
- "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
- "github.com/langgenius/dify-plugin-daemon/internal/db"
- "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
- "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
- "github.com/langgenius/dify-plugin-daemon/internal/types/models"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
- )
- func Endpoint(
- ctx *gin.Context,
- endpoint *models.Endpoint,
- pluginInstallation *models.PluginInstallation,
- path string,
- ) {
- if !endpoint.Enabled {
- ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
- return
- }
- req := ctx.Request.Clone(context.Background())
- // get query params
- queryParams := req.URL.Query()
- // replace path with endpoint path
- req.URL.Path = path
- // set query params
- req.URL.RawQuery = queryParams.Encode()
- // read request body until complete, max 10MB
- body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
- if err != nil {
- ctx.JSON(500, exception.InternalServerError(err).ToResponse())
- return
- }
- // replace with a new reader
- req.Body = io.NopCloser(bytes.NewReader(body))
- req.ContentLength = int64(len(body))
- req.TransferEncoding = nil
- // remove ip traces for security
- req.Header.Del("X-Forwarded-For")
- req.Header.Del("X-Real-IP")
- req.Header.Del("X-Forwarded")
- req.Header.Del("X-Original-Forwarded-For")
- req.Header.Del("X-Original-Url")
- req.Header.Del("X-Original-Host")
- // setup hook id to request
- req.Header.Set("Dify-Hook-Id", endpoint.HookID)
- var buffer bytes.Buffer
- err = req.Write(&buffer)
- if err != nil {
- ctx.JSON(500, exception.InternalServerError(err).ToResponse())
- return
- }
- identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
- if err != nil {
- ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
- return
- }
- // fetch plugin
- manager := plugin_manager.Manager()
- runtime, err := manager.Get(identifier)
- if err != nil {
- ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
- return
- }
- // fetch endpoint declaration
- endpointDeclaration := runtime.Configuration().Endpoint
- if endpointDeclaration == nil {
- ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
- return
- }
- // decrypt settings
- settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
- BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
- TenantId: endpoint.TenantID,
- UserId: "",
- Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
- },
- InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
- Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
- Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
- Identity: endpoint.ID,
- Data: endpoint.Settings,
- Config: endpointDeclaration.Settings,
- },
- })
- if err != nil {
- ctx.JSON(500, exception.InternalServerError(err).ToResponse())
- return
- }
- session := session_manager.NewSession(
- session_manager.NewSessionPayload{
- TenantID: endpoint.TenantID,
- UserID: "",
- PluginUniqueIdentifier: identifier,
- ClusterID: ctx.GetString("cluster_id"),
- InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
- Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
- Declaration: runtime.Configuration(),
- BackwardsInvocation: manager.BackwardsInvocation(),
- IgnoreCache: false,
- EndpointID: &endpoint.ID,
- },
- )
- defer session.Close(session_manager.CloseSessionPayload{
- IgnoreCache: false,
- })
- session.BindRuntime(runtime)
- statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
- session, &requests.RequestInvokeEndpoint{
- RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
- Settings: settings,
- },
- )
- if err != nil {
- ctx.JSON(500, exception.InternalServerError(err).ToResponse())
- return
- }
- defer response.Close()
- done := make(chan bool)
- closed := new(int32)
- ctx.Status(statusCode)
- for k, v := range *headers {
- if len(v) > 0 {
- ctx.Writer.Header().Set(k, v[0])
- }
- }
- close := func() {
- if atomic.CompareAndSwapInt32(closed, 0, 1) {
- close(done)
- }
- }
- defer close()
- routine.Submit(map[string]string{
- "module": "service",
- "function": "Endpoint",
- }, func() {
- defer close()
- for response.Next() {
- chunk, err := response.Read()
- if err != nil {
- ctx.Writer.Write([]byte(err.Error()))
- ctx.Writer.Flush()
- return
- }
- ctx.Writer.Write(chunk)
- ctx.Writer.Flush()
- }
- })
- select {
- case <-ctx.Writer.CloseNotify():
- case <-done:
- case <-time.After(240 * time.Second):
- ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
- }
- }
- func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
- endpoint, err := db.GetOne[models.Endpoint](
- db.Equal("id", endpoint_id),
- db.Equal("tenant_id", tenant_id),
- )
- if err != nil {
- return exception.NotFoundError(errors.New("endpoint not found")).ToResponse()
- }
- endpoint.Enabled = true
- if err := install_service.EnabledEndpoint(&endpoint); err != nil {
- return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
- }
- return entities.NewSuccessResponse(true)
- }
- func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
- endpoint, err := db.GetOne[models.Endpoint](
- db.Equal("id", endpoint_id),
- db.Equal("tenant_id", tenant_id),
- )
- if err != nil {
- return exception.NotFoundError(errors.New("Endpoint not found")).ToResponse()
- }
- endpoint.Enabled = false
- if err := install_service.DisabledEndpoint(&endpoint); err != nil {
- return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
- }
- return entities.NewSuccessResponse(true)
- }
- func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
- endpoints, err := db.GetAll[models.Endpoint](
- db.Equal("tenant_id", tenant_id),
- db.OrderBy("created_at", true),
- db.Page(page, page_size),
- )
- if err != nil {
- return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
- }
- manager := plugin_manager.Manager()
- if manager == nil {
- return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
- }
- // decrypt settings
- for i, endpoint := range endpoints {
- pluginInstallation, err := db.GetOne[models.PluginInstallation](
- db.Equal("plugin_id", endpoint.PluginID),
- db.Equal("tenant_id", tenant_id),
- )
- if err != nil {
- // use empty settings and declaration for uninstalled plugins
- endpoint.Settings = map[string]any{}
- endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
- Settings: []plugin_entities.ProviderConfig{},
- Endpoints: []plugin_entities.EndpointDeclaration{},
- EndpointFiles: []string{},
- }
- endpoints[i] = endpoint
- continue
- }
- pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
- pluginInstallation.PluginUniqueIdentifier,
- )
- if err != nil {
- return exception.UniqueIdentifierError(
- fmt.Errorf("failed to parse plugin unique identifier: %v", err),
- ).ToResponse()
- }
- pluginDeclaration, err := manager.GetDeclaration(
- pluginUniqueIdentifier,
- tenant_id,
- plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
- )
- if err != nil {
- return exception.InternalServerError(
- fmt.Errorf("failed to get plugin declaration: %v", err),
- ).ToResponse()
- }
- if pluginDeclaration.Endpoint == nil {
- return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
- }
- decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
- BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
- TenantId: tenant_id,
- UserId: "",
- Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
- },
- InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
- Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
- Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
- Identity: endpoint.ID,
- Data: endpoint.Settings,
- Config: pluginDeclaration.Endpoint.Settings,
- },
- })
- if err != nil {
- return exception.InternalServerError(
- fmt.Errorf("failed to decrypt settings: %v", err),
- ).ToResponse()
- }
- // mask settings
- decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
- endpoint.Settings = decryptedSettings
- endpoint.Declaration = pluginDeclaration.Endpoint
- endpoints[i] = endpoint
- }
- return entities.NewSuccessResponse(endpoints)
- }
- func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
- endpoints, err := db.GetAll[models.Endpoint](
- db.Equal("plugin_id", plugin_id),
- db.Equal("tenant_id", tenant_id),
- db.OrderBy("created_at", true),
- db.Page(page, page_size),
- )
- if err != nil {
- return exception.InternalServerError(
- fmt.Errorf("failed to list endpoints: %v", err),
- ).ToResponse()
- }
- manager := plugin_manager.Manager()
- if manager == nil {
- return exception.InternalServerError(
- errors.New("failed to get plugin manager"),
- ).ToResponse()
- }
- // decrypt settings
- for i, endpoint := range endpoints {
- // get installation
- pluginInstallation, err := db.GetOne[models.PluginInstallation](
- db.Equal("plugin_id", plugin_id),
- db.Equal("tenant_id", tenant_id),
- )
- if err != nil {
- return exception.NotFoundError(
- fmt.Errorf("failed to find plugin installation: %v", err),
- ).ToResponse()
- }
- pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
- pluginInstallation.PluginUniqueIdentifier,
- )
- if err != nil {
- return exception.UniqueIdentifierError(
- fmt.Errorf("failed to parse plugin unique identifier: %v", err),
- ).ToResponse()
- }
- pluginDeclaration, err := manager.GetDeclaration(
- pluginUniqueIdentifier,
- tenant_id,
- plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
- )
- if err != nil {
- return exception.InternalServerError(
- fmt.Errorf("failed to get plugin declaration: %v", err),
- ).ToResponse()
- }
- decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
- BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
- TenantId: tenant_id,
- UserId: "",
- Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
- },
- InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
- Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
- Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
- Identity: endpoint.ID,
- Data: endpoint.Settings,
- Config: pluginDeclaration.Endpoint.Settings,
- },
- })
- if err != nil {
- return exception.InternalServerError(
- fmt.Errorf("failed to decrypt settings: %v", err),
- ).ToResponse()
- }
- // mask settings
- decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
- endpoint.Settings = decryptedSettings
- endpoint.Declaration = pluginDeclaration.Endpoint
- endpoints[i] = endpoint
- }
- return entities.NewSuccessResponse(endpoints)
- }
|