| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | 
							- package service
 
- import (
 
- 	"bytes"
 
- 	"context"
 
- 	"encoding/hex"
 
- 	"sync/atomic"
 
- 	"time"
 
- 	"github.com/gin-gonic/gin"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 
- 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 
- )
 
- func Endpoint(
 
- 	ctx *gin.Context,
 
- 	endpoint *models.Endpoint,
 
- 	path string,
 
- ) {
 
- 	req := ctx.Request.Clone(context.Background())
 
- 	req.URL.Path = path
 
- 	var buffer bytes.Buffer
 
- 	err := req.Write(&buffer)
 
- 	if err != nil {
 
- 		ctx.JSON(500, gin.H{"error": err.Error()})
 
- 	}
 
- 	// fetch plugin
 
- 	manager := plugin_manager.GetGlobalPluginManager()
 
- 	runtime := manager.Get(endpoint.PluginID)
 
- 	if runtime == nil {
 
- 		ctx.JSON(404, gin.H{"error": "plugin not found"})
 
- 		return
 
- 	}
 
- 	// fetch endpoint declaration
 
- 	endpoint_declaration := runtime.Configuration().Endpoint
 
- 	if endpoint_declaration == nil {
 
- 		ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
 
- 		return
 
- 	}
 
- 	// decrypt settings
 
- 	settings, err := dify_invocation.InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
 
- 		BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
 
- 			TenantId: endpoint.TenantID,
 
- 			UserId:   "",
 
- 			Type:     dify_invocation.INVOKE_TYPE_ENCRYPT,
 
- 		},
 
- 		InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
 
- 			Opt:       dify_invocation.ENCRYPT_OPT_DECRYPT,
 
- 			Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
 
- 			Identity:  endpoint.ID,
 
- 			Data:      endpoint.GetSettings(),
 
- 			Config:    endpoint_declaration.Settings,
 
- 		},
 
- 	})
 
- 	if err != nil {
 
- 		ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
 
- 		return
 
- 	}
 
- 	session := session_manager.NewSession(
 
- 		endpoint.TenantID,
 
- 		"",
 
- 		endpoint.PluginID,
 
- 		ctx.GetString("cluster_id"),
 
- 		access_types.PLUGIN_ACCESS_TYPE_Endpoint,
 
- 		access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
 
- 		runtime.Configuration(),
 
- 	)
 
- 	defer session.Close()
 
- 	session.BindRuntime(runtime)
 
- 	status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
 
- 		session, &requests.RequestInvokeEndpoint{
 
- 			RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
 
- 			Settings:       settings,
 
- 		},
 
- 	)
 
- 	if err != nil {
 
- 		ctx.JSON(500, gin.H{"error": err.Error()})
 
- 		return
 
- 	}
 
- 	defer response.Close()
 
- 	done := make(chan bool)
 
- 	closed := new(int32)
 
- 	ctx.Status(status_code)
 
- 	for k, v := range *headers {
 
- 		if len(v) > 0 {
 
- 			ctx.Writer.Header().Set(k, v[0])
 
- 		}
 
- 	}
 
- 	close := func() {
 
- 		if atomic.CompareAndSwapInt32(closed, 0, 1) {
 
- 			close(done)
 
- 		}
 
- 	}
 
- 	defer close()
 
- 	routine.Submit(func() {
 
- 		defer close()
 
- 		for response.Next() {
 
- 			chunk, err := response.Read()
 
- 			if err != nil {
 
- 				ctx.JSON(500, gin.H{"error": err.Error()})
 
- 				return
 
- 			}
 
- 			ctx.Writer.Write(chunk)
 
- 			ctx.Writer.Flush()
 
- 		}
 
- 	})
 
- 	select {
 
- 	case <-ctx.Writer.CloseNotify():
 
- 	case <-done:
 
- 	case <-time.After(30 * time.Second):
 
- 		ctx.JSON(500, gin.H{"error": "killed by timeout"})
 
- 	}
 
- }
 
 
  |