Ver código fonte

feat: call dify

Yeuoly 1 ano atrás
pai
commit
f6d83da22f

+ 1 - 1
.env.example

@@ -1,7 +1,7 @@
 # A secretkey that is used for securely communicating with DIFY API. 
 # You can generate a strong key using `openssl rand -base64 42`.
 PLUGIN_INNER_API_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
-PLUGIN_INNER_API_URL=http://http://127.0.0.1:5001
+PLUGIN_INNER_API_URL=http://127.0.0.1:5001
 
 SERVER_PORT=5002
 

+ 34 - 44
cmd/tests/main.go

@@ -1,52 +1,42 @@
 package main
 
-import (
-	"fmt"
-	"math/rand"
-	"sync/atomic"
-	"time"
+import "fmt"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
-)
+type Inf interface {
+	Set(int)
+}
 
-func main() {
-	response := stream.NewStreamResponse[string](1024)
+type A struct {
+	num int
+}
+
+func (a *A) Set(data int) {
+	a.num = data
+}
+
+type B struct {
+	num int
+}
 
-	random_string := func() string {
-		return fmt.Sprintf("%d", rand.Intn(100000))
+func (b *B) Set(data int) {
+	b.num = data
+}
+
+type C interface {
+	*A | *B
+
+	Inf
+}
+
+type D[T C] struct {
+	data T
+}
+
+func main() {
+	d := D[*B]{
+		data: &B{},
 	}
+	d.data.Set(10)
 
-	traffic := new(int64)
-
-	go func() {
-		for {
-			response.Write(random_string())
-		}
-	}()
-
-	go func() {
-		for {
-			response.Write(random_string())
-		}
-	}()
-
-	go func() {
-		for response.Next() {
-			atomic.AddInt64(traffic, 1)
-			_, err := response.Read()
-			if err != nil {
-				fmt.Println(err)
-				break
-			}
-		}
-	}()
-
-	go func() {
-		for range time.NewTicker(time.Second).C {
-			fmt.Printf("Traffic: %d, Unsolved: %d\n", atomic.LoadInt64(traffic), response.Size())
-			atomic.StoreInt64(traffic, 0)
-		}
-	}()
-
-	select {}
+	fmt.Println(d.data.num)
 }

+ 3 - 3
internal/core/dify_invocation/http_request.go

@@ -21,14 +21,14 @@ func StreamResponse[T any](method string, path string, options ...requests.HttpO
 	return requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
 }
 
-func InvokeModel(payload InvokeModelRequest) (*stream.StreamResponse[InvokeModelResponseChunk], error) {
+func InvokeModel(payload *InvokeModelRequest) (*stream.StreamResponse[InvokeModelResponseChunk], error) {
 	return StreamResponse[InvokeModelResponseChunk]("POST", "invoke/model", requests.HttpPayloadJson(payload))
 }
 
-func InvokeTool(payload InvokeToolRequest) (*stream.StreamResponse[InvokeToolResponseChunk], error) {
+func InvokeTool(payload *InvokeToolRequest) (*stream.StreamResponse[InvokeToolResponseChunk], error) {
 	return StreamResponse[InvokeToolResponseChunk]("POST", "invoke/tool", requests.HttpPayloadJson(payload))
 }
 
-func InvokeNode[T WorkflowNodeData](payload InvokeNodeRequest[T]) (*InvokeNodeResponse, error) {
+func InvokeNode[T WorkflowNodeData](payload *InvokeNodeRequest[T]) (*InvokeNodeResponse, error) {
 	return Request[InvokeNodeResponse]("POST", "invoke/node", requests.HttpPayloadJson(payload))
 }

+ 75 - 1
internal/core/dify_invocation/types.go

@@ -1,12 +1,39 @@
 package dify_invocation
 
-import "encoding/json"
+import (
+	"encoding/json"
+	"fmt"
+)
 
 type BaseInvokeDifyRequest struct {
 	TenantId string `json:"tenant_id"`
 	UserId   string `json:"user_id"`
+	Type     string `json:"type"`
 }
 
+func (r *BaseInvokeDifyRequest) FromMap(data map[string]any) error {
+	var ok bool
+	if r.TenantId, ok = data["tenant_id"].(string); !ok {
+		return fmt.Errorf("tenant_id is not a string")
+	}
+
+	if r.UserId, ok = data["user_id"].(string); !ok {
+		return fmt.Errorf("user_id is not a string")
+	}
+
+	if r.Type, ok = data["type"].(string); !ok {
+		return fmt.Errorf("type is not a string")
+	}
+
+	return nil
+}
+
+const (
+	INVOKE_TYPE_MODEL = "model"
+	INVOKE_TYPE_TOOL  = "tool"
+	INVOKE_TYPE_NODE  = "node"
+)
+
 type InvokeModelRequest struct {
 	BaseInvokeDifyRequest
 	Provider   string         `json:"provider"`
@@ -14,6 +41,23 @@ type InvokeModelRequest struct {
 	Parameters map[string]any `json:"parameters"`
 }
 
+func (r *InvokeModelRequest) FromMap(base map[string]any, data map[string]any) error {
+	var ok bool
+	if r.Provider, ok = data["provider"].(string); !ok {
+		return fmt.Errorf("provider is not a string")
+	}
+
+	if r.Model, ok = data["model"].(string); !ok {
+		return fmt.Errorf("model is not a string")
+	}
+
+	if r.Parameters, ok = data["parameters"].(map[string]any); !ok {
+		return fmt.Errorf("parameters is not a map")
+	}
+
+	return nil
+}
+
 func (r InvokeModelRequest) MarshalJSON() ([]byte, error) {
 	flattened := make(map[string]any)
 	flattened["tenant_id"] = r.TenantId
@@ -34,6 +78,23 @@ type InvokeToolRequest struct {
 	Parameters map[string]any `json:"parameters"`
 }
 
+func (r *InvokeToolRequest) FromMap(base map[string]any, data map[string]any) error {
+	var ok bool
+	if r.Provider, ok = data["provider"].(string); !ok {
+		return fmt.Errorf("provider is not a string")
+	}
+
+	if r.Tool, ok = data["tool"].(string); !ok {
+		return fmt.Errorf("tool is not a string")
+	}
+
+	if r.Parameters, ok = data["parameters"].(map[string]any); !ok {
+		return fmt.Errorf("parameters is not a map")
+	}
+
+	return nil
+}
+
 func (r InvokeToolRequest) MarshalJSON() ([]byte, error) {
 	flattened := make(map[string]any)
 	flattened["tenant_id"] = r.TenantId
@@ -53,6 +114,19 @@ type InvokeNodeRequest[T WorkflowNodeData] struct {
 	NodeData T      `json:"node_data"`
 }
 
+func (r *InvokeNodeRequest[T]) FromMap(data map[string]any) error {
+	var ok bool
+	if r.NodeType, ok = data["node_type"].(string); !ok {
+		return fmt.Errorf("node_type is not a string")
+	}
+
+	if err := r.NodeData.FromMap(data["node_data"].(map[string]any)); err != nil {
+		return err
+	}
+
+	return nil
+}
+
 func (r InvokeNodeRequest[T]) MarshalJSON() ([]byte, error) {
 	flattened := make(map[string]any)
 	flattened["tenant_id"] = r.TenantId

+ 26 - 2
internal/core/dify_invocation/workflow_node_data.go

@@ -1,18 +1,42 @@
 package dify_invocation
 
 type WorkflowNodeData interface {
-	KnowledgeRetrievalNodeData | QuestionClassifierNodeData |
-		ParameterExtractorNodeData | CodeNodeData
+	FromMap(map[string]any) error
+
+	*KnowledgeRetrievalNodeData | *QuestionClassifierNodeData | *ParameterExtractorNodeData | *CodeNodeData
 }
 
+const (
+	NODE_TYPE_KNOWLEDGE_RETRIEVAL = "knowledge_retrieval"
+	NODE_TYPE_QUESTION_CLASSIFIER = "question_classifier"
+	NODE_TYPE_PARAMETER_EXTRACTOR = "parameter_extractor"
+	NODE_TYPE_CODE                = "code"
+)
+
 type KnowledgeRetrievalNodeData struct {
 }
 
+func (r *KnowledgeRetrievalNodeData) FromMap(data map[string]any) error {
+	return nil
+}
+
 type QuestionClassifierNodeData struct {
 }
 
+func (r *QuestionClassifierNodeData) FromMap(data map[string]any) error {
+	return nil
+}
+
 type ParameterExtractorNodeData struct {
 }
 
+func (r *ParameterExtractorNodeData) FromMap(data map[string]any) error {
+	return nil
+}
+
 type CodeNodeData struct {
 }
+
+func (r *CodeNodeData) FromMap(data map[string]any) error {
+	return nil
+}

+ 5 - 5
internal/core/plugin_daemon/daemon.go

@@ -25,23 +25,23 @@ func InvokeTool(session *session_manager.Session, provider_name string, tool_nam
 
 	listener := runtime.Listen(session.ID())
 	listener.AddListener(func(message []byte) {
-		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.StreamMessage](message)
+		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
 		if err != nil {
 			log.Error("unmarshal json failed: %s", err.Error())
 			return
 		}
 
 		switch chunk.Type {
-		case plugin_entities.STREAM_MESSAGE_TYPE_STREAM:
+		case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
 			chunk, err := parser.UnmarshalJsonBytes[ToolResponseChunk](chunk.Data)
 			if err != nil {
 				log.Error("unmarshal json failed: %s", err.Error())
 				return
 			}
 			response.Write(chunk)
-		case plugin_entities.STREAM_MESSAGE_TYPE_INVOKE:
-			// TODO: invoke dify
-		case plugin_entities.STREAM_MESSAGE_TYPE_END:
+		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
+			invokeDify(runtime, session, chunk.Data)
+		case plugin_entities.SESSION_MESSAGE_TYPE_END:
 			response.Close()
 		default:
 			log.Error("unknown stream message type: %s", chunk.Type)

+ 173 - 0
internal/core/plugin_daemon/invoke_dify.go

@@ -0,0 +1,173 @@
+package plugin_daemon
+
+import (
+	"fmt"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+func invokeDify(runtime entities.PluginRuntimeInterface,
+	session *session_manager.Session, data []byte,
+) error {
+	// unmarshal invoke data
+	request, err := parser.UnmarshalJsonBytes[map[string]any](data)
+
+	if err != nil {
+		return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
+	}
+
+	typ, ok := request["type"].(string)
+	if !ok {
+		return fmt.Errorf("invoke request missing type: %s", data)
+	}
+
+	// get request id
+	request_id, ok := request["request_id"].(string)
+	if !ok {
+		return fmt.Errorf("invoke request missing request_id: %s", data)
+	}
+
+	// get request
+	detailed_request, ok := request["request"].(map[string]any)
+	if !ok {
+		return fmt.Errorf("invoke request missing request: %s", data)
+	}
+
+	switch typ {
+	case "tool":
+		r := dify_invocation.InvokeToolRequest{}
+		if err := r.FromMap(request, detailed_request); err != nil {
+			return fmt.Errorf("unmarshal tool invoke request failed: %s", err.Error())
+		}
+		submitToolTask(runtime, session, request_id, &r)
+	case "model":
+		r := dify_invocation.InvokeModelRequest{}
+		if err := r.FromMap(request, detailed_request); err != nil {
+			return fmt.Errorf("unmarshal model invoke request failed: %s", err.Error())
+		}
+		submitModelTask(runtime, session, request_id, &r)
+	case "node":
+		node_type, ok := detailed_request["node_type"].(string)
+		if !ok {
+			return fmt.Errorf("invoke request missing node_type: %s", data)
+		}
+		node_data, ok := detailed_request["data"].(map[string]any)
+		if !ok {
+			return fmt.Errorf("invoke request missing data: %s", data)
+		}
+		switch node_type {
+		case dify_invocation.NODE_TYPE_QUESTION_CLASSIFIER:
+			d := dify_invocation.InvokeNodeRequest[*dify_invocation.QuestionClassifierNodeData]{
+				NodeType: dify_invocation.NODE_TYPE_QUESTION_CLASSIFIER,
+				NodeData: &dify_invocation.QuestionClassifierNodeData{},
+			}
+			if err := d.FromMap(node_data); err != nil {
+				return fmt.Errorf("unmarshal question classifier node data failed: %s", err.Error())
+			}
+			submitNodeInvocationRequestTask(runtime, session, request_id, &d)
+		case dify_invocation.NODE_TYPE_KNOWLEDGE_RETRIEVAL:
+			d := dify_invocation.InvokeNodeRequest[*dify_invocation.KnowledgeRetrievalNodeData]{
+				NodeType: dify_invocation.NODE_TYPE_KNOWLEDGE_RETRIEVAL,
+				NodeData: &dify_invocation.KnowledgeRetrievalNodeData{},
+			}
+			if err := d.FromMap(node_data); err != nil {
+				return fmt.Errorf("unmarshal knowledge retrieval node data failed: %s", err.Error())
+			}
+			submitNodeInvocationRequestTask(runtime, session, request_id, &d)
+		case dify_invocation.NODE_TYPE_PARAMETER_EXTRACTOR:
+			d := dify_invocation.InvokeNodeRequest[*dify_invocation.ParameterExtractorNodeData]{
+				NodeType: dify_invocation.NODE_TYPE_PARAMETER_EXTRACTOR,
+				NodeData: &dify_invocation.ParameterExtractorNodeData{},
+			}
+			if err := d.FromMap(node_data); err != nil {
+				return fmt.Errorf("unmarshal parameter extractor node data failed: %s", err.Error())
+			}
+			submitNodeInvocationRequestTask(runtime, session, request_id, &d)
+		case dify_invocation.NODE_TYPE_CODE:
+			d := dify_invocation.InvokeNodeRequest[*dify_invocation.CodeNodeData]{
+				NodeType: dify_invocation.NODE_TYPE_CODE,
+				NodeData: &dify_invocation.CodeNodeData{},
+			}
+			if err := d.FromMap(node_data); err != nil {
+				return fmt.Errorf("unmarshal code node data failed: %s", err.Error())
+			}
+			submitNodeInvocationRequestTask(runtime, session, request_id, &d)
+		default:
+			return fmt.Errorf("unknown node type: %s", node_type)
+		}
+	default:
+		return fmt.Errorf("unknown invoke type: %s", typ)
+	}
+
+	return nil
+}
+
+func setTaskContext(session *session_manager.Session, r *dify_invocation.BaseInvokeDifyRequest) {
+	r.TenantId = session.TenantID()
+	r.UserId = session.UserID()
+}
+
+func submitModelTask(
+	runtime entities.PluginRuntimeInterface,
+	session *session_manager.Session,
+	request_id string,
+	t *dify_invocation.InvokeModelRequest,
+) {
+	setTaskContext(session, &t.BaseInvokeDifyRequest)
+	routine.Submit(func() {
+		response, err := dify_invocation.InvokeModel(t)
+		if err != nil {
+			log.Error("invoke model failed: %s", err.Error())
+			return
+		}
+
+		for response.Next() {
+			chunk, _ := response.Read()
+			fmt.Println(chunk)
+		}
+	})
+}
+
+func submitToolTask(
+	runtime entities.PluginRuntimeInterface,
+	session *session_manager.Session,
+	request_id string,
+	t *dify_invocation.InvokeToolRequest,
+) {
+	setTaskContext(session, &t.BaseInvokeDifyRequest)
+	routine.Submit(func() {
+		response, err := dify_invocation.InvokeTool(t)
+		if err != nil {
+			log.Error("invoke tool failed: %s", err.Error())
+			return
+		}
+
+		for response.Next() {
+			chunk, _ := response.Read()
+			fmt.Println(chunk)
+		}
+	})
+}
+
+func submitNodeInvocationRequestTask[W dify_invocation.WorkflowNodeData](
+	runtime entities.PluginRuntimeInterface,
+	session *session_manager.Session,
+	request_id string,
+	t *dify_invocation.InvokeNodeRequest[W],
+) {
+	setTaskContext(session, &t.BaseInvokeDifyRequest)
+	routine.Submit(func() {
+		response, err := dify_invocation.InvokeNode(t)
+		if err != nil {
+			log.Error("invoke node failed: %s", err.Error())
+			return
+		}
+
+		fmt.Println(response)
+	})
+}

+ 5 - 0
internal/types/entities/from_map.go

@@ -0,0 +1,5 @@
+package entities
+
+type FromMapper interface {
+	FromMap(map[string]any) error
+}

+ 4 - 4
internal/types/entities/plugin_entities/event.go

@@ -20,15 +20,15 @@ type PluginLogEvent struct {
 	Timestamp float64 `json:"timestamp"`
 }
 
-type StreamMessage struct {
+type SessionMessage struct {
 	Type string          `json:"type"`
 	Data json.RawMessage `json:"data"`
 }
 
 const (
-	STREAM_MESSAGE_TYPE_STREAM = "stream"
-	STREAM_MESSAGE_TYPE_END    = "end"
-	STREAM_MESSAGE_TYPE_INVOKE = "invoke"
+	SESSION_MESSAGE_TYPE_STREAM = "stream"
+	SESSION_MESSAGE_TYPE_END    = "end"
+	SESSION_MESSAGE_TYPE_INVOKE = "invoke"
 )
 
 type InvokeToolResponseChunk struct {

+ 3 - 0
internal/utils/requests/http_request.go

@@ -41,12 +41,15 @@ func buildHttpRequest(method string, url string, options ...HttpOptions) (*http.
 			req.Body = io.NopCloser(strings.NewReader(q.Encode()))
 		case "payloadText":
 			req.Body = io.NopCloser(strings.NewReader(option.Value.(string)))
+			req.Header.Set("Content-Type", "text/plain")
 		case "payloadJson":
 			jsonStr, err := json.Marshal(option.Value)
 			if err != nil {
 				return nil, err
 			}
 			req.Body = io.NopCloser(bytes.NewBuffer(jsonStr))
+			// set application/json content type
+			req.Header.Set("Content-Type", "application/json")
 		case "directReferer":
 			req.Header.Set("Referer", url)
 		}