Forráskód Böngészése

refactor: invoke dify

Yeuoly 1 éve%!(EXTRA string=óta)
szülő
commit
a47e36c310

+ 7 - 5
.env.example

@@ -1,17 +1,19 @@
-DIFY_URL=http://127.0.0.1:5001
 # A secretkey that is used for securely communicating with DIFY API. 
 # You can generate a strong key using `openssl rand -base64 42`.
-DIFY_CALLING_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
-DIFY_CALLING_PORT=5002
+PLUGIN_INNER_API_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
+PLUGIN_INNER_API_URL=http://http://127.0.0.1:5001
 
-PLUGIN_HOST=127.0.0.1
-PLUGIN_PORT=5003
+SERVER_PORT=5002
+
+PLUGIN_REMOTE_INSTALLING_HOST=127.0.0.1
+PLUGIN_REMOTE_INSTALLING_PORT=5003
 
 ROUTINE_POOL_SIZE=1024
 
 REDIS_HOST=127.0.0.1
 REDIS_PORT=6379
 REDIS_PASS=difyai123456
+
 LIFETIME_COLLECTION_HEARTBEAT_INTERVAL=5
 LIFETIME_COLLECTION_CG_INTERVAL=60
 LIFETIME_STATE_GC_INTERVAL=300

+ 1 - 1
cmd/server/main.go

@@ -28,8 +28,8 @@ func main() {
 }
 
 func setDefault(config *app.Config) {
+	setDefaultInt(&config.SERVER_PORT, 5002)
 	setDefaultInt(&config.RoutinePoolSize, 1000)
-	setDefaultInt(&config.DifyCallingPort, 5002)
 	setDefaultInt(&config.LifetimeCollectionGCInterval, 60)
 	setDefaultInt(&config.LifetimeCollectionHeartbeatInterval, 5)
 	setDefaultInt(&config.LifetimeStateGCInterval, 300)

+ 6 - 3
internal/core/dify_invocation/http_client.go

@@ -8,11 +8,12 @@ import (
 )
 
 var (
-	baseurl *url.URL
-	client  *http.Client
+	PLUGIN_INNER_API_KEY string
+	baseurl              *url.URL
+	client               *http.Client
 )
 
-func InitDifyInvocationDaemon(base string) error {
+func InitDifyInvocationDaemon(base string, calling_key string) error {
 	var err error
 	baseurl, err = url.Parse(base)
 	if err != nil {
@@ -29,5 +30,7 @@ func InitDifyInvocationDaemon(base string) error {
 		},
 	}
 
+	PLUGIN_INNER_API_KEY = calling_key
+
 	return nil
 }

+ 28 - 1
internal/core/dify_invocation/http_request.go

@@ -1,7 +1,34 @@
 package dify_invocation
 
-import "github.com/langgenius/dify-plugin-daemon/internal/utils/requests"
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/requests"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
 
 func Request[T any](method string, path string, options ...requests.HttpOptions) (*T, error) {
+	options = append(options, requests.HttpHeader(map[string]string{
+		"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
+	}))
+
 	return requests.RequestAndParse[T](client, difyPath(path), method, options...)
 }
+
+func StreamResponse[T any](method string, path string, options ...requests.HttpOptions) (*stream.StreamResponse[T], error) {
+	options = append(options, requests.HttpHeader(map[string]string{
+		"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
+	}))
+
+	return requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
+}
+
+func InvokeModel(payload InvokeModelRequest) (*stream.StreamResponse[InvokeModelResponseChunk], error) {
+	return StreamResponse[InvokeModelResponseChunk]("POST", "invoke/model", requests.HttpPayloadJson(payload))
+}
+
+func InvokeTool(payload InvokeToolRequest) (*stream.StreamResponse[InvokeToolResponseChunk], error) {
+	return StreamResponse[InvokeToolResponseChunk]("POST", "invoke/tool", requests.HttpPayloadJson(payload))
+}
+
+func InvokeNode[T WorkflowNodeData](payload InvokeNodeRequest[T]) (*InvokeNodeResponse, error) {
+	return Request[InvokeNodeResponse]("POST", "invoke/node", requests.HttpPayloadJson(payload))
+}

+ 1 - 0
internal/core/dify_invocation/path.go

@@ -1,5 +1,6 @@
 package dify_invocation
 
 func difyPath(path ...string) string {
+	path = append([]string{"inner", "api"}, path...)
 	return baseurl.JoinPath(path...).String()
 }

+ 66 - 0
internal/core/dify_invocation/types.go

@@ -0,0 +1,66 @@
+package dify_invocation
+
+import "encoding/json"
+
+type BaseInvokeDifyRequest struct {
+	TenantId string `json:"tenant_id"`
+	UserId   string `json:"user_id"`
+}
+
+type InvokeModelRequest struct {
+	BaseInvokeDifyRequest
+	Provider   string         `json:"provider"`
+	Model      string         `json:"model"`
+	Parameters map[string]any `json:"parameters"`
+}
+
+func (r InvokeModelRequest) MarshalJSON() ([]byte, error) {
+	flattened := make(map[string]any)
+	flattened["tenant_id"] = r.TenantId
+	flattened["user_id"] = r.UserId
+	flattened["provider"] = r.Provider
+	flattened["model"] = r.Model
+	flattened["parameters"] = r.Parameters
+	return json.Marshal(flattened)
+}
+
+type InvokeModelResponseChunk struct {
+}
+
+type InvokeToolRequest struct {
+	BaseInvokeDifyRequest
+	Provider   string         `json:"provider"`
+	Tool       string         `json:"tool"`
+	Parameters map[string]any `json:"parameters"`
+}
+
+func (r InvokeToolRequest) MarshalJSON() ([]byte, error) {
+	flattened := make(map[string]any)
+	flattened["tenant_id"] = r.TenantId
+	flattened["user_id"] = r.UserId
+	flattened["provider"] = r.Provider
+	flattened["tool"] = r.Tool
+	flattened["parameters"] = r.Parameters
+	return json.Marshal(flattened)
+}
+
+type InvokeToolResponseChunk struct {
+}
+
+type InvokeNodeRequest[T WorkflowNodeData] struct {
+	BaseInvokeDifyRequest
+	NodeType string `json:"node_type"`
+	NodeData T      `json:"node_data"`
+}
+
+func (r InvokeNodeRequest[T]) MarshalJSON() ([]byte, error) {
+	flattened := make(map[string]any)
+	flattened["tenant_id"] = r.TenantId
+	flattened["user_id"] = r.UserId
+	flattened["node_type"] = r.NodeType
+	flattened["node_data"] = r.NodeData
+	return json.Marshal(flattened)
+}
+
+type InvokeNodeResponse struct {
+}

+ 18 - 0
internal/core/dify_invocation/workflow_node_data.go

@@ -0,0 +1,18 @@
+package dify_invocation
+
+type WorkflowNodeData interface {
+	KnowledgeRetrievalNodeData | QuestionClassifierNodeData |
+		ParameterExtractorNodeData | CodeNodeData
+}
+
+type KnowledgeRetrievalNodeData struct {
+}
+
+type QuestionClassifierNodeData struct {
+}
+
+type ParameterExtractorNodeData struct {
+}
+
+type CodeNodeData struct {
+}

+ 3 - 1
internal/core/plugin_daemon/daemon.go

@@ -39,6 +39,8 @@ func InvokeTool(session *session_manager.Session, provider_name string, tool_nam
 				return
 			}
 			response.Write(chunk)
+		case plugin_entities.STREAM_MESSAGE_TYPE_INVOKE:
+			// TODO: invoke dify
 		case plugin_entities.STREAM_MESSAGE_TYPE_END:
 			response.Close()
 		default:
@@ -56,7 +58,7 @@ func InvokeTool(session *session_manager.Session, provider_name string, tool_nam
 			"provider":   provider_name,
 			"tool":       tool_name,
 			"parameters": tool_parameters,
-			"session_id": session.ID,
+			"session_id": session.ID(),
 		},
 	)))
 

+ 7 - 0
internal/core/plugin_manager/manager.go

@@ -3,6 +3,7 @@ package plugin_manager
 import (
 	"fmt"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
@@ -49,6 +50,12 @@ func Init(configuration *app.Config) {
 		log.Panic("init redis client failed: %s", err.Error())
 	}
 
+	if err := dify_invocation.InitDifyInvocationDaemon(
+		configuration.PluginInnerApiURL, configuration.PluginInnerApiKey,
+	); err != nil {
+		log.Panic("init dify invocation daemon failed: %s", err.Error())
+	}
+
 	// start plugin watcher
 	startWatcher(configuration)
 

+ 1 - 3
internal/core/plugin_manager/stdio_holder/io.go

@@ -66,7 +66,7 @@ func (s *stdioHolder) StartStdout() {
 
 					log.Info("plugin %s: %s", s.pluginIdentity, logEvent.Message)
 				}
-			case plugin_entities.PLUGIN_EVENT_RESPONSE:
+			case plugin_entities.PLUGIN_EVENT_SESSION:
 				for _, listener := range listeners {
 					listener(s.id, event.Data)
 				}
@@ -78,8 +78,6 @@ func (s *stdioHolder) StartStdout() {
 				}
 			case plugin_entities.PLUGIN_EVENT_ERROR:
 				log.Error("plugin %s: %s", s.pluginIdentity, event.Data)
-			case plugin_entities.PLUGIN_EVENT_INVOKE:
-				// invoke dify
 			}
 		}
 	}

+ 2 - 2
internal/server/http.go

@@ -11,7 +11,7 @@ func server(config *app.Config) {
 	engine := gin.Default()
 
 	engine.GET("/health/check", HealthCheck)
-	engine.POST("/plugin/tool/invoke", CheckingKey(config.DifyCallingKey), InvokeTool)
+	engine.POST("/plugin/tool/invoke", CheckingKey(config.PluginInnerApiKey), InvokeTool)
 
-	engine.Run(fmt.Sprintf(":%d", config.DifyCallingPort))
+	engine.Run(fmt.Sprintf(":%d", config.SERVER_PORT))
 }

+ 6 - 5
internal/types/app/config.go

@@ -1,12 +1,13 @@
 package app
 
 type Config struct {
-	DifyURL         string `envconfig:"DIFY_URL"`
-	DifyCallingKey  string `envconfig:"DIFY_CALLING_KEY"`
-	DifyCallingPort int16  `envconfig:"DIFY_CALLING_PORT"`
+	SERVER_PORT int16 `envconfig:"SERVER_PORT"`
 
-	PluginHost string `envconfig:"PLUGIN_HOST"`
-	PluginPort int16  `envconfig:"PLUGIN_PORT"`
+	PluginInnerApiKey string `envconfig:"PLUGIN_INNER_API_KEY"`
+	PluginInnerApiURL string `envconfig:"PLUGIN_INNER_API_URL"`
+
+	PluginRemoteInstallingHost string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
+	PluginRemoteInstallingPort int16  `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
 
 	StoragePath string `envconfig:"STORAGE_PATH"`
 

+ 6 - 6
internal/types/entities/plugin_entities/event.go

@@ -9,10 +9,9 @@ type PluginUniversalEvent struct {
 }
 
 const (
-	PLUGIN_EVENT_LOG      = "log"
-	PLUGIN_EVENT_RESPONSE = "response"
-	PLUGIN_EVENT_ERROR    = "error"
-	PLUGIN_EVENT_INVOKE   = "invoke"
+	PLUGIN_EVENT_LOG     = "log"
+	PLUGIN_EVENT_SESSION = "session"
+	PLUGIN_EVENT_ERROR   = "error"
 )
 
 type PluginLogEvent struct {
@@ -29,11 +28,12 @@ type StreamMessage struct {
 const (
 	STREAM_MESSAGE_TYPE_STREAM = "stream"
 	STREAM_MESSAGE_TYPE_END    = "end"
+	STREAM_MESSAGE_TYPE_INVOKE = "invoke"
 )
 
 type InvokeToolResponseChunk struct {
-	Type    string          `json:"type" binding:"required"`
-	Message json.RawMessage `json:"message" binding:"required"`
+	Type string          `json:"type"`
+	Data json.RawMessage `json:"data"`
 }
 
 type InvokeModelResponseChunk struct {