| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | package serviceimport (	"bytes"	"context"	"encoding/hex"	"sync/atomic"	"time"	"github.com/gin-gonic/gin"	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"	"github.com/langgenius/dify-plugin-daemon/internal/types/models"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine")func Endpoint(	ctx *gin.Context,	endpoint *models.Endpoint,	path string,) {	req := ctx.Request.Clone(context.Background())	req.URL.Path = path	var buffer bytes.Buffer	err := req.Write(&buffer)	if err != nil {		ctx.JSON(500, gin.H{"error": err.Error()})	}	// fetch plugin	manager := plugin_manager.GetGlobalPluginManager()	runtime := manager.Get(endpoint.PluginID)	if runtime == nil {		ctx.JSON(404, gin.H{"error": "plugin not found"})		return	}	// fetch endpoint declaration	endpoint_declaration := runtime.Configuration().Endpoint	if endpoint_declaration == nil {		ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})		return	}	// decrypt settings	settings, err := dify_invocation.InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{		BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{			TenantId: endpoint.TenantID,			UserId:   "",			Type:     dify_invocation.INVOKE_TYPE_ENCRYPT,		},		InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{			Opt:       dify_invocation.ENCRYPT_OPT_DECRYPT,			Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,			Identity:  endpoint.ID,			Data:      endpoint.GetSettings(),			Config:    endpoint_declaration.Settings,		},	})	if err != nil {		ctx.JSON(500, gin.H{"error": "failed to decrypt data"})		return	}	session := session_manager.NewSession(		endpoint.TenantID,		"",		endpoint.PluginID,		ctx.GetString("cluster_id"),		access_types.PLUGIN_ACCESS_TYPE_Endpoint,		access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,		runtime.Configuration(),	)	defer session.Close()	session.BindRuntime(runtime)	status_code, headers, response, err := plugin_daemon.InvokeEndpoint(		session, &requests.RequestInvokeEndpoint{			RawHttpRequest: hex.EncodeToString(buffer.Bytes()),			Settings:       settings,		},	)	if err != nil {		ctx.JSON(500, gin.H{"error": err.Error()})		return	}	defer response.Close()	done := make(chan bool)	closed := new(int32)	ctx.Status(status_code)	for k, v := range *headers {		if len(v) > 0 {			ctx.Writer.Header().Set(k, v[0])		}	}	close := func() {		if atomic.CompareAndSwapInt32(closed, 0, 1) {			close(done)		}	}	defer close()	routine.Submit(func() {		defer close()		for response.Next() {			chunk, err := response.Read()			if err != nil {				ctx.JSON(500, gin.H{"error": err.Error()})				return			}			ctx.Writer.Write(chunk)			ctx.Writer.Flush()		}	})	select {	case <-ctx.Writer.CloseNotify():	case <-done:	case <-time.After(30 * time.Second):		ctx.JSON(500, gin.H{"error": "killed by timeout"})	}}
 |