Browse Source

feat: invoke tool

Yeuoly 1 year ago
parent
commit
7f728bd29d

+ 1 - 1
examples/baisc_math/main.py

@@ -20,7 +20,7 @@ class Add(Tool):
     
     def _invoke(self, user_id: str, tool_parameter: dict) -> Generator[ToolInvokeMessage, None, None]:
         result = tool_parameter['a'] + tool_parameter['b']
-        yield ToolInvokeTextMessage(message={'result': str(result)})
+        yield self.create_text_message(f'The result is {result}')
 
 if __name__ == '__main__':
     plugin.run()

+ 1 - 0
go.mod

@@ -10,6 +10,7 @@ require (
 	github.com/cloudwego/base64x v0.1.4 // indirect
 	github.com/cloudwego/iasm v0.2.0 // indirect
 	github.com/gabriel-vasile/mimetype v1.4.4 // indirect
+	github.com/gammazero/deque v0.2.1
 	github.com/gin-contrib/sse v0.1.0 // indirect
 	github.com/gin-gonic/gin v1.10.0 // indirect
 	github.com/go-playground/locales v0.14.1 // indirect

+ 2 - 0
go.sum

@@ -10,6 +10,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
 github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s=
+github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
+github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
 github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
 github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
 github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=

+ 63 - 0
internal/core/plugin_daemon/daemon.go

@@ -0,0 +1,63 @@
+package plugin_daemon
+
+import (
+	"errors"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
+
+type ToolResponseChunk = plugin_entities.InvokeToolResponseChunk
+
+func InvokeTool(session *entities.InvocationSession, provider_name string, tool_name string, tool_parameters map[string]any) (
+	*entities.InvocationResponse[ToolResponseChunk], error,
+) {
+	runtime := plugin_manager.Get(session.PluginIdentity)
+	if runtime == nil {
+		return nil, errors.New("plugin not found")
+	}
+
+	response := entities.NewInvocationResponse[ToolResponseChunk](512)
+
+	listener := runtime.Listen(session.ID)
+	listener.AddListener(func(message []byte) {
+		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.StreamMessage](message)
+		if err != nil {
+			log.Error("unmarshal json failed: %s", err.Error())
+			return
+		}
+
+		switch chunk.Type {
+		case plugin_entities.STREAM_MESSAGE_TYPE_STREAM:
+			chunk, err := parser.UnmarshalJsonBytes[ToolResponseChunk](chunk.Data)
+			if err != nil {
+				log.Error("unmarshal json failed: %s", err.Error())
+				return
+			}
+			response.Write(chunk)
+		case plugin_entities.STREAM_MESSAGE_TYPE_END:
+			response.Close()
+		default:
+			log.Error("unknown stream message type: %s", chunk.Type)
+			response.Close()
+		}
+	})
+
+	response.OnClose(func() {
+		listener.Close()
+	})
+
+	runtime.Write(session.ID, []byte(parser.MarshalJson(
+		map[string]any{
+			"provider":   provider_name,
+			"tool":       tool_name,
+			"parameters": tool_parameters,
+			"session_id": session.ID,
+		},
+	)))
+
+	return response, nil
+}

+ 2 - 2
internal/core/plugin_manager/init.go

@@ -8,9 +8,9 @@ import (
 
 var m sync.Map
 
-func checkPluginExist(identity string) (*entities.PluginRuntime, bool) {
+func checkPluginExist(identity string) (entities.PluginRuntimeInterface, bool) {
 	if v, ok := m.Load(identity); ok {
-		return v.(*entities.PluginRuntime), true
+		return v.(entities.PluginRuntimeInterface), true
 	}
 
 	return nil, false

+ 6 - 5
internal/core/plugin_manager/local_manager/io.go

@@ -7,17 +7,18 @@ import (
 
 func (r *LocalPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
 	listener := entities.NewIOListener[[]byte]()
-	listener_id := stdio_holder.OnStdioEvent(r.io_identity, func(b []byte) {
-		listener.Write(b)
-	})
 	listener.OnClose(func() {
-		stdio_holder.RemoveStdioListener(r.io_identity, listener_id)
+		stdio_holder.RemoveListener(r.io_identity, session_id)
+	})
+	stdio_holder.OnEvent(r.io_identity, session_id, func(b []byte) {
+		listener.Emit(b)
 	})
+
 	return listener
 }
 
 func (r *LocalPluginRuntime) Write(session_id string, data []byte) {
-	stdio_holder.Write(r.io_identity, data)
+	stdio_holder.Write(r.io_identity, append(data, '\n'))
 }
 
 func (r *LocalPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {

+ 13 - 8
internal/core/plugin_manager/local_manager/run.go

@@ -49,9 +49,21 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 		return err
 	}
 
+	defer func() {
+		// wait for plugin to exit
+		err = e.Wait()
+		if err != nil {
+			r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+			log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
+		}
+	}()
+
 	log.Info("plugin %s started", r.Config.Identity())
 
-	stdio := stdio_holder.PutStdio(stdin, stdout, stderr)
+	stdio := stdio_holder.Put(r.Config.Identity(), stdin, stdout, stderr)
+
+	// set io identity
+	r.io_identity = stdio.GetID()
 
 	wg := sync.WaitGroup{}
 	wg.Add(1)
@@ -69,13 +81,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 		return err
 	}
 
-	// wait for plugin to exit
-	err = e.Wait()
-	if err != nil {
-		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
-		return err
-	}
-
 	wg.Wait()
 
 	// plugin has exited

+ 12 - 3
internal/core/plugin_manager/manager.go

@@ -6,10 +6,10 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
-func List() []*entities.PluginRuntime {
-	var runtimes []*entities.PluginRuntime
+func List() []entities.PluginRuntimeInterface {
+	var runtimes []entities.PluginRuntimeInterface
 	m.Range(func(key, value interface{}) bool {
-		if v, ok := value.(*entities.PluginRuntime); ok {
+		if v, ok := value.(entities.PluginRuntimeInterface); ok {
 			runtimes = append(runtimes, v)
 		}
 		return true
@@ -17,6 +17,15 @@ func List() []*entities.PluginRuntime {
 	return runtimes
 }
 
+func Get(identity string) entities.PluginRuntimeInterface {
+	if v, ok := m.Load(identity); ok {
+		if r, ok := v.(entities.PluginRuntimeInterface); ok {
+			return r
+		}
+	}
+	return nil
+}
+
 func Put(path string, binary []byte) {
 	//TODO: put binary into
 }

+ 79 - 36
internal/core/plugin_manager/stdio_holder/io.go

@@ -1,11 +1,15 @@
 package stdio_holder
 
 import (
+	"bufio"
 	"fmt"
 	"io"
 	"sync"
 
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 var (
@@ -15,14 +19,15 @@ var (
 )
 
 type stdioHolder struct {
-	id        string
-	writer    io.WriteCloser
-	reader    io.ReadCloser
-	errReader io.ReadCloser
-	l         *sync.Mutex
-	listener  map[string]func([]byte)
-	started   bool
-	alive     bool
+	id             string
+	pluginIdentity string
+	writer         io.WriteCloser
+	reader         io.ReadCloser
+	errReader      io.ReadCloser
+	l              *sync.Mutex
+	listener       map[string]func([]byte)
+	started        bool
+	alive          bool
 }
 
 func (s *stdioHolder) Stop() {
@@ -30,29 +35,57 @@ func (s *stdioHolder) Stop() {
 	s.writer.Close()
 	s.reader.Close()
 	s.errReader.Close()
+
+	stdio_holder.Delete(s.id)
 }
 
 func (s *stdioHolder) StartStdout() {
 	s.started = true
 	s.alive = true
-	for s.alive {
-		buf := make([]byte, 1024)
-		n, err := s.reader.Read(buf)
-		if err != nil {
-			s.Stop()
-			break
-		}
 
-		for _, listener := range listeners {
-			listener(s.id, buf[:n])
-		}
+	scanner := bufio.NewScanner(s.reader)
+	for s.alive {
+		for scanner.Scan() {
+			data := scanner.Bytes()
+			event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
+			if err != nil {
+				log.Error("unmarshal json failed: %s", err.Error())
+				continue
+			}
 
-		for _, listener := range s.listener {
-			listener(buf[:n])
+			session_id := event.SessionId
+
+			switch event.Event {
+			case plugin_entities.PLUGIN_EVENT_LOG:
+				if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
+					logEvent, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](event.Data)
+					if err != nil {
+						log.Error("unmarshal json failed: %s", err.Error())
+						continue
+					}
+
+					log.Info("plugin %s: %s", s.pluginIdentity, logEvent.Message)
+				}
+			case plugin_entities.PLUGIN_EVENT_RESPONSE:
+				for _, listener := range listeners {
+					listener(s.id, event.Data)
+				}
+
+				for listener_session_id, listener := range s.listener {
+					if listener_session_id == session_id {
+						listener(event.Data)
+					}
+				}
+			case plugin_entities.PLUGIN_EVENT_ERROR:
+				log.Error("plugin %s: %s", s.pluginIdentity, event.Data)
+			}
 		}
 	}
 }
 
+/*
+ * @return error
+ */
 func (s *stdioHolder) StartStderr() error {
 	s.started = true
 	s.alive = true
@@ -74,19 +107,31 @@ func (s *stdioHolder) StartStderr() error {
 	return nil
 }
 
+func (s *stdioHolder) GetID() string {
+	return s.id
+}
+
 /*
+ * @param plugin_identity: string
  * @param writer: io.WriteCloser
  * @param reader: io.ReadCloser
+ * @param errReader: io.ReadCloser
  */
-func PutStdio(writer io.WriteCloser, reader io.ReadCloser, errReader io.ReadCloser) *stdioHolder {
+func Put(
+	plugin_identity string,
+	writer io.WriteCloser,
+	reader io.ReadCloser,
+	errReader io.ReadCloser,
+) *stdioHolder {
 	id := uuid.New().String()
 
 	holder := &stdioHolder{
-		writer:    writer,
-		reader:    reader,
-		errReader: errReader,
-		id:        id,
-		l:         &sync.Mutex{},
+		pluginIdentity: plugin_identity,
+		writer:         writer,
+		reader:         reader,
+		errReader:      errReader,
+		id:             id,
+		l:              &sync.Mutex{},
 	}
 
 	stdio_holder.Store(id, holder)
@@ -96,7 +141,7 @@ func PutStdio(writer io.WriteCloser, reader io.ReadCloser, errReader io.ReadClos
 /*
  * @param id: string
  */
-func GetStdio(id string) *stdioHolder {
+func Get(id string) *stdioHolder {
 	if v, ok := stdio_holder.Load(id); ok {
 		if holder, ok := v.(*stdioHolder); ok {
 			return holder
@@ -109,16 +154,17 @@ func GetStdio(id string) *stdioHolder {
 /*
  * @param id: string
  */
-func RemoveStdio(id string) {
+func Remove(id string) {
 	stdio_holder.Delete(id)
 }
 
 /*
  * @param id: string
+ * @param session_id: string
  * @param listener: func(data []byte)
  * @return string - listener identity
  */
-func OnStdioEvent(id string, listener func([]byte)) string {
+func OnEvent(id string, session_id string, listener func([]byte)) {
 	if v, ok := stdio_holder.Load(id); ok {
 		if holder, ok := v.(*stdioHolder); ok {
 			holder.l.Lock()
@@ -127,20 +173,16 @@ func OnStdioEvent(id string, listener func([]byte)) string {
 				holder.listener = map[string]func([]byte){}
 			}
 
-			identity := uuid.New().String()
-			holder.listener[identity] = listener
-			return identity
+			holder.listener[session_id] = listener
 		}
 	}
-
-	return ""
 }
 
 /*
  * @param id: string
  * @param listener: string
  */
-func RemoveStdioListener(id string, listener string) {
+func RemoveListener(id string, listener string) {
 	if v, ok := stdio_holder.Load(id); ok {
 		if holder, ok := v.(*stdioHolder); ok {
 			holder.l.Lock()
@@ -153,7 +195,7 @@ func RemoveStdioListener(id string, listener string) {
 /*
  * @param listener: func(id string, data []byte)
  */
-func OnStdioEventGlobal(listener func(string, []byte)) {
+func OnGlobalEvent(listener func(string, []byte)) {
 	l.Lock()
 	defer l.Unlock()
 	listeners[uuid.New().String()] = listener
@@ -167,6 +209,7 @@ func Write(id string, data []byte) error {
 	if v, ok := stdio_holder.Load(id); ok {
 		if holder, ok := v.(*stdioHolder); ok {
 			_, err := holder.writer.Write(data)
+
 			return err
 		}
 	}

+ 1 - 1
internal/core/plugin_manager/watcher.go

@@ -44,7 +44,7 @@ func handleNewPlugins(path string, platform string) {
 
 		log.Info("loaded plugin: %s", plugin.Config.Identity())
 
-		m.Store(plugin.Config.Identity(), &plugin)
+		m.Store(plugin.Config.Identity(), plugin_interface)
 
 		routine.Submit(func() {
 			lifetime(plugin_interface)

+ 16 - 5
internal/server/controller.go

@@ -1,11 +1,22 @@
 package server
 
-import "github.com/gin-gonic/gin"
-
-func InvokePlugin(c *gin.Context) {
-
-}
+import (
+	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+)
 
 func HealthCheck(c *gin.Context) {
 	c.JSON(200, gin.H{"status": "ok"})
 }
+
+func InvokeTool(c *gin.Context) {
+	type request = plugin_entities.InvokePluginRequest[plugin_entities.InvokeToolRequest]
+
+	BindRequest[request](
+		c,
+		func(itr request) {
+			service.InvokeTool(&itr, c)
+		},
+	)
+}

+ 1 - 1
internal/server/http.go

@@ -11,7 +11,7 @@ func server(config *app.Config) {
 	engine := gin.Default()
 
 	engine.GET("/health/check", HealthCheck)
-	engine.POST("/plugin/invoke", CheckingKey(config.DifyCallingKey), InvokePlugin)
+	engine.POST("/plugin/tool/invoke", CheckingKey(config.DifyCallingKey), InvokeTool)
 
 	engine.Run(fmt.Sprintf(":%d", config.DifyCallingPort))
 }

+ 25 - 0
internal/server/validate.go

@@ -0,0 +1,25 @@
+package server
+
+import (
+	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+)
+
+func BindRequest[T any](r *gin.Context, success func(T)) {
+	var request T
+	var err error
+
+	context_type := r.GetHeader("Content-Type")
+	if context_type == "application/json" {
+		err = r.ShouldBindJSON(&request)
+	} else {
+		err = r.ShouldBind(&request)
+	}
+
+	if err != nil {
+		resp := entities.NewErrorResponse(-400, "Invalid request")
+		r.JSON(400, resp)
+		return
+	}
+	success(request)
+}

+ 67 - 0
internal/service/invoke.go

@@ -0,0 +1,67 @@
+package service
+
+import (
+	"github.com/gin-gonic/gin"
+	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+func InvokeTool(r *plugin_entities.InvokePluginRequest[plugin_entities.InvokeToolRequest], ctx *gin.Context) {
+	// create session
+	session_id := uuid.New().String()
+	session := &entities.InvocationSession{
+		ID:             session_id,
+		PluginIdentity: parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion),
+	}
+
+	writer := ctx.Writer
+	writer.WriteHeader(200)
+	writer.Header().Set("Content-Type", "text/event-stream")
+
+	done := make(chan bool)
+
+	write_data := func(data interface{}) {
+		writer.WriteString("data: ")
+		writer.Write([]byte(parser.MarshalJson(data)))
+		writer.Write([]byte("\n\n"))
+		writer.Flush()
+	}
+
+	plugin_daemon_response, err := plugin_daemon.InvokeTool(
+		session,
+		r.Data.ProviderName,
+		r.Data.ToolName,
+		r.Data.Parameters,
+	)
+
+	if err != nil {
+		write_data(entities.NewErrorResponse(-500, err.Error()))
+		close(done)
+		return
+	}
+
+	routine.Submit(func() {
+		for plugin_daemon_response.Next() {
+			chunk, err := plugin_daemon_response.Read()
+			if err != nil {
+				break
+			}
+			write_data(entities.NewSuccessResponse(chunk))
+		}
+		close(done)
+	})
+
+	select {
+	case <-writer.CloseNotify():
+		plugin_daemon_response.Close()
+	case <-done:
+	}
+}
+
+func InvokeModel(r *plugin_entities.InvokePluginRequest[plugin_entities.InvokeModelRequest], ctx *gin.Context) {
+
+}

+ 4 - 2
internal/types/entities/config.go

@@ -1,6 +1,8 @@
 package entities
 
-import "fmt"
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
 
 type PluginConfiguration struct {
 	Version  string                      `json:"version"`
@@ -13,7 +15,7 @@ type PluginConfiguration struct {
 }
 
 func (p *PluginConfiguration) Identity() string {
-	return fmt.Sprintf("%s:%s", p.Name, p.Version)
+	return parser.MarshalPluginIdentity(p.Name, p.Version)
 }
 
 type PluginConfigurationResource struct {

+ 19 - 25
internal/types/entities/listener.go

@@ -1,45 +1,39 @@
 package entities
 
-import "sync/atomic"
+import "sync"
 
 type IOListener[T any] struct {
-	msg        chan T
-	closed     *int32
-	close_hook []func()
+	l        *sync.RWMutex
+	onClose  func()
+	listener []func(T)
 }
 
 type BytesIOListener = IOListener[[]byte]
 
 func NewIOListener[T any]() *IOListener[T] {
 	return &IOListener[T]{
-		msg:        make(chan T),
-		closed:     new(int32),
-		close_hook: []func(){},
+		l: &sync.RWMutex{},
 	}
 }
 
-func (r *IOListener[T]) Listen() <-chan T {
-	return r.msg
+func (r *IOListener[T]) AddListener(f func(T)) {
+	r.l.Lock()
+	defer r.l.Unlock()
+	r.listener = append(r.listener, f)
 }
 
-func (r *IOListener[T]) Close() {
-	if !atomic.CompareAndSwapInt32(r.closed, 0, 1) {
-		return
-	}
-	atomic.StoreInt32(r.closed, 1)
-	for _, hook := range r.close_hook {
-		hook()
-	}
-	close(r.msg)
+func (r *IOListener[T]) OnClose(f func()) {
+	r.onClose = f
 }
 
-func (r *IOListener[T]) Write(data T) {
-	if atomic.LoadInt32(r.closed) == 1 {
-		return
-	}
-	r.msg <- data
+func (r *IOListener[T]) Close() {
+	r.onClose()
 }
 
-func (r *IOListener[T]) OnClose(hook func()) {
-	r.close_hook = append(r.close_hook, hook)
+func (r *IOListener[T]) Emit(data T) {
+	r.l.RLock()
+	defer r.l.RUnlock()
+	for _, listener := range r.listener {
+		listener(data)
+	}
 }

+ 40 - 0
internal/types/entities/plugin_entities/event.go

@@ -0,0 +1,40 @@
+package plugin_entities
+
+import "encoding/json"
+
+type PluginUniversalEvent struct {
+	Event     string          `json:"event"`
+	SessionId string          `json:"session_id"`
+	Data      json.RawMessage `json:"data"`
+}
+
+const (
+	PLUGIN_EVENT_LOG      = "log"
+	PLUGIN_EVENT_RESPONSE = "response"
+	PLUGIN_EVENT_ERROR    = "error"
+	PLUGIN_EVENT_INVOKE   = "invoke"
+)
+
+type PluginLogEvent struct {
+	Level     string  `json:"level"`
+	Message   string  `json:"message"`
+	Timestamp float64 `json:"timestamp"`
+}
+
+type StreamMessage struct {
+	Type string          `json:"type"`
+	Data json.RawMessage `json:"data"`
+}
+
+const (
+	STREAM_MESSAGE_TYPE_STREAM = "stream"
+	STREAM_MESSAGE_TYPE_END    = "end"
+)
+
+type InvokeToolResponseChunk struct {
+	Type    string          `json:"type" binding:"required"`
+	Message json.RawMessage `json:"message" binding:"required"`
+}
+
+type InvokeModelResponseChunk struct {
+}

+ 20 - 0
internal/types/entities/plugin_entities/registration.go

@@ -0,0 +1,20 @@
+package plugin_entities
+
+type PluginRegistration struct {
+	PluginName    string                      `json:"plugin_name"`
+	PluginVersion string                      `json:"plugin_version"`
+	Models        []ModelProviderRegistration `json:"models"`
+	Tools         []ToolProviderRegistration  `json:"tools"`
+}
+
+type ToolProviderRegistration struct {
+}
+
+type ToolRegistration struct {
+}
+
+type ModelProviderRegistration struct {
+}
+
+type ModelRegistration struct {
+}

+ 23 - 0
internal/types/entities/plugin_entities/request.go

@@ -0,0 +1,23 @@
+package plugin_entities
+
+type InvokePluginRequestData interface {
+	InvokeToolRequest | InvokeModelRequest
+}
+
+type InvokeModelRequest struct {
+}
+
+type InvokePluginRequest[T InvokePluginRequestData] struct {
+	PluginName    string `json:"plugin_name" binding:"required"`
+	PluginVersion string `json:"plugin_version" binding:"required"`
+	TenantId      string `json:"tenant_id" binding:"required"`
+	Data          T      `json:"data" binding:"required"`
+}
+
+type InvokeToolRequest struct {
+	ProviderName string `json:"provider_name" binding:"required"`
+	ToolName     string `json:"tool_name" binding:"required"`
+	ToolRuntime  struct {
+	} `json:"tool_runtime" binding:"required"`
+	Parameters map[string]interface{} `json:"parameters" binding:"required"`
+}

+ 23 - 0
internal/types/entities/response.go

@@ -0,0 +1,23 @@
+package entities
+
+type Response struct {
+	Code    int    `json:"code"`
+	Message string `json:"message"`
+	Data    any    `json:"data"`
+}
+
+func NewSuccessResponse(data any) *Response {
+	return &Response{
+		Code:    0,
+		Message: "success",
+		Data:    data,
+	}
+}
+
+func NewErrorResponse(code int, message string) *Response {
+	return &Response{
+		Code:    code,
+		Message: message,
+		Data:    nil,
+	}
+}

+ 124 - 0
internal/types/entities/session.go

@@ -0,0 +1,124 @@
+package entities
+
+import (
+	"errors"
+	"sync"
+
+	"github.com/gammazero/deque"
+)
+
+type InvocationSession struct {
+	ID             string
+	PluginIdentity string
+}
+
+type InvocationResponse[T any] struct {
+	q         deque.Deque[T]
+	l         *sync.Mutex
+	sig       chan bool
+	closed    bool
+	max       int
+	listening bool
+	onClose   func()
+}
+
+func NewInvocationResponse[T any](max int) *InvocationResponse[T] {
+	return &InvocationResponse[T]{
+		l:   &sync.Mutex{},
+		sig: make(chan bool),
+		max: max,
+	}
+}
+
+func (r *InvocationResponse[T]) OnClose(f func()) {
+	r.onClose = f
+}
+
+func (r *InvocationResponse[T]) Next() bool {
+	r.l.Lock()
+	if r.closed {
+		r.l.Unlock()
+		return false
+	}
+
+	if r.q.Len() > 0 {
+		r.l.Unlock()
+		return true
+	}
+
+	r.listening = true
+	defer func() {
+		r.listening = false
+	}()
+
+	r.l.Unlock()
+	return <-r.sig
+}
+
+func (r *InvocationResponse[T]) Read() (T, error) {
+	r.l.Lock()
+	defer r.l.Unlock()
+
+	if r.q.Len() > 0 {
+		data := r.q.PopFront()
+		return data, nil
+	} else {
+		var data T
+		return data, errors.New("no data available, please call Next() to wait for data")
+	}
+}
+
+func (r *InvocationResponse[T]) Write(data T) error {
+	r.l.Lock()
+	if r.closed {
+		r.l.Unlock()
+		return nil
+	}
+
+	if r.q.Len() >= r.max {
+		r.l.Unlock()
+		return errors.New("queue is full")
+	}
+
+	r.q.PushBack(data)
+	if r.q.Len() == 1 {
+		if r.listening {
+			r.sig <- true
+		}
+	}
+	r.l.Unlock()
+	return nil
+}
+
+func (r *InvocationResponse[T]) Close() {
+	r.l.Lock()
+	if r.closed {
+		r.l.Unlock()
+		return
+	}
+	r.closed = true
+	r.l.Unlock()
+
+	select {
+	case r.sig <- false:
+	default:
+	}
+	close(r.sig)
+	if r.onClose != nil {
+		r.onClose()
+	}
+}
+
+func (r *InvocationResponse[T]) IsClosed() bool {
+	r.l.Lock()
+	defer r.l.Unlock()
+
+	return r.closed
+}
+
+func (r *InvocationResponse[T]) Size() int {
+	r.l.Lock()
+	defer r.l.Unlock()
+
+	return r.q.Len()
+}

+ 7 - 0
internal/utils/parser/identity.go

@@ -0,0 +1,7 @@
+package parser
+
+import "fmt"
+
+func MarshalPluginIdentity(name string, version string) string {
+	return fmt.Sprintf("%s:%s", name, version)
+}

+ 10 - 1
internal/utils/parser/json.go

@@ -3,8 +3,12 @@ package parser
 import "encoding/json"
 
 func UnmarshalJson[T any](text string) (T, error) {
+	return UnmarshalJsonBytes[T]([]byte(text))
+}
+
+func UnmarshalJsonBytes[T any](data []byte) (T, error) {
 	var result T
-	err := json.Unmarshal([]byte(text), &result)
+	err := json.Unmarshal(data, &result)
 	return result, err
 }
 
@@ -12,3 +16,8 @@ func MarshalJson[T any](data T) string {
 	b, _ := json.Marshal(data)
 	return string(b)
 }
+
+func MarshalJsonBytes[T any](data T) []byte {
+	b, _ := json.Marshal(data)
+	return b
+}