Yeuoly vor 1 Jahr
Ursprung
Commit
bc3218ca8c

+ 10 - 5
.env.example

@@ -1,7 +1,12 @@
-DIFY_PLUGIN_HOST=127.0.0.1
-DIFY_PLUGIN_PORT=5002
-DIFY_PLUGIN_KEY=your_plugin_key
+DIFY_URL=http://127.0.0.1:5001
+# A secretkey that is used for securely communicating with DIFY API. 
+# You can generate a strong key using `openssl rand -base64 42`.
+DIFY_CALLING_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
+DIFY_CALLING_PORT=5002
 
-STORAGE_PATH=/var/dify/plugins
+PLUGIN_HOST=127.0.0.1
+PLUGIN_PORT=5003
 
-PLATFORM=local
+STORAGE_PATH=examples
+
+PLATFORM=local

+ 14 - 0
cmd/server/main.go

@@ -6,6 +6,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/server"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"golang.org/x/exp/constraints"
 )
 
 func main() {
@@ -21,5 +22,18 @@ func main() {
 		log.Panic("Error processing environment variables")
 	}
 
+	setDefault(&config)
+
 	server.Run(&config)
 }
+
+func setDefault(config *app.Config) {
+	setDefaultInt(&config.RoutinePoolSize, 1000)
+	setDefaultInt(&config.DifyCallingPort, 5002)
+}
+
+func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {
+	if *value == 0 {
+		*value = defaultValue
+	}
+}

+ 0 - 58
cmd/test/main.go

@@ -1,58 +0,0 @@
-package main
-
-import "fmt"
-
-// Speaker 接口
-type Speaker interface {
-	Speak() string
-}
-
-// Namer 接口
-type Namer interface {
-	Name() string
-}
-
-// Greeter 接口,组合了 Speaker 和 Namer 接口
-type Greeter interface {
-	Speaker
-	Namer
-}
-
-// Dog 结构体
-type Dog struct {
-	name string
-}
-
-// Dog 的 Speak 方法
-func (d *Dog) Speak() string {
-	return fmt.Sprintf("%s says woof!", d.name)
-}
-
-// Dog 的 Name 方法
-func (d *Dog) Name() string {
-	return d.name
-}
-
-// GermanShepherd 结构体,组合了 Dog
-type GermanShepherd struct {
-	Dog
-}
-
-// Labrador 结构体,组合了 Dog
-type Labrador struct {
-	Dog
-}
-
-// 使用 Greeter 接口的函数
-func GreetAndSpeak(g Greeter) {
-	fmt.Println("Hello", g.Name())
-	fmt.Println(g.Speak())
-}
-
-func main() {
-	gs := GermanShepherd{Dog{name: "Rex"}}
-	lb := Labrador{Dog{name: "Buddy"}}
-
-	GreetAndSpeak(&gs)
-	GreetAndSpeak(&lb)
-}

+ 2 - 1
examples/baisc_math/.gitignore

@@ -1,3 +1,4 @@
 __pycache__
 install.sh
-.installed
+.installed
+dify_plugin

+ 1 - 0
examples/baisc_math/launch.sh

@@ -0,0 +1 @@
+activate basic_math && python -m main

+ 24 - 9
examples/baisc_math/main.py

@@ -1,11 +1,26 @@
-def main():
-    while True:
-        try:
-            x = int(input('Enter a number: '))
-            y = int(input('Enter another number: '))
-            print(f'{x} + {y} = {x + y}')
-        except ValueError:
-            print('Invalid input. Please enter a number.')
+from typing import Generator
+from dify_plugin import ToolProvider, Plugin
+from dify_plugin.tool.entities import ToolInvokeTextMessage, ToolProviderConfiguration
+from dify_plugin.tool.tool import Tool
+from dify_plugin.tool.entities import ToolConfiguration, ToolInvokeMessage
+
+plugin = Plugin()
+
+@plugin.register_tool_provider
+class BasicMath(ToolProvider):
+    @classmethod
+    def configuration(cls) -> ToolProviderConfiguration:
+        return ToolProviderConfiguration(name='basic_math')
+
+@plugin.register_tool(BasicMath)
+class Add(Tool):
+    @classmethod
+    def configuration(cls) -> ToolConfiguration:
+        return ToolConfiguration(name='add')
+    
+    def _invoke(self, user_id: str, tool_parameter: dict) -> Generator[ToolInvokeMessage, None, None]:
+        result = tool_parameter['a'] + tool_parameter['b']
+        yield ToolInvokeTextMessage(message={'result': str(result)})
 
 if __name__ == '__main__':
-    main()
+    plugin.run()

+ 1 - 1
examples/baisc_math/manifest.json

@@ -3,7 +3,7 @@
     "author": "Yeuoly",
     "name": "basic_math",
     "datetime": 1719812022,
-    "exec": "conda activate basic_math && python -m main",
+    "module": "main",
     "resource": {
         "memory": 1048576,
         "storage": 1048576,

+ 25 - 0
examples/baisc_math/test.py

@@ -0,0 +1,25 @@
+class SimpleApp:
+    def __init__(self):
+        self.routes = {}
+
+    def route(self, path):
+        def decorator(func):
+            self.routes[path] = func
+            return func
+        return decorator
+
+    def execute(self, path):
+        if path in self.routes:
+            return self.routes[path]()
+        else:
+            raise ValueError("Route not found!")
+
+app = SimpleApp()
+
+@app.route("/")
+def home():
+    return "Welcome to the home page!"
+
+@app.route("/about")
+def about():
+    return "About us page!"

+ 28 - 0
go.mod

@@ -5,14 +5,42 @@ go 1.20
 require github.com/google/uuid v1.6.0
 
 require (
+	github.com/bytedance/sonic v1.11.9 // indirect
+	github.com/bytedance/sonic/loader v0.1.1 // indirect
+	github.com/cloudwego/base64x v0.1.4 // indirect
+	github.com/cloudwego/iasm v0.2.0 // indirect
+	github.com/gabriel-vasile/mimetype v1.4.4 // indirect
+	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/gin-gonic/gin v1.10.0 // indirect
+	github.com/go-playground/locales v0.14.1 // indirect
+	github.com/go-playground/universal-translator v0.18.1 // indirect
+	github.com/go-playground/validator/v10 v10.22.0 // indirect
+	github.com/goccy/go-json v0.10.3 // indirect
 	github.com/joho/godotenv v1.5.1 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/kelseyhightower/envconfig v1.4.0 // indirect
+	github.com/klauspost/cpuid/v2 v2.2.8 // indirect
+	github.com/leodido/go-urn v1.4.0 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/panjf2000/ants v1.3.0 // indirect
 	github.com/panjf2000/gnet/v2 v2.5.5 // indirect
+	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
+	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+	github.com/ugorji/go/codec v1.2.12 // indirect
 	github.com/valyala/bytebufferpool v1.0.0 // indirect
 	go.uber.org/atomic v1.11.0 // indirect
 	go.uber.org/multierr v1.11.0 // indirect
 	go.uber.org/zap v1.27.0 // indirect
+	golang.org/x/arch v0.8.0 // indirect
+	golang.org/x/crypto v0.24.0 // indirect
+	golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
+	golang.org/x/net v0.26.0 // indirect
 	golang.org/x/sync v0.7.0 // indirect
 	golang.org/x/sys v0.21.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
+	google.golang.org/protobuf v1.34.2 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
 )

+ 79 - 0
go.sum

@@ -1,13 +1,71 @@
+github.com/bytedance/sonic v1.11.9 h1:LFHENlIY/SLzDWverzdOvgMztTxcfcF+cqNsz9pK5zg=
+github.com/bytedance/sonic v1.11.9/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
+github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
+github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
+github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
+github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
+github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
+github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
+github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s=
+github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
+github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
+github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
+github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
+github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
+github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
+github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
+github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
+github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao=
+github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
+github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
+github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
 github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
+github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
+github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
+github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
+github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
+github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
+github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
 github.com/panjf2000/gnet/v2 v2.5.5 h1:H+LqGgCHs2mGJq/4n6YELhMjZ027bNgd5Qb8Wj5nbrM=
 github.com/panjf2000/gnet/v2 v2.5.5/go.mod h1:ppopMJ8VrDbJu8kDsqFQTgNmpMS8Le5CmPxISf+Sauk=
+github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
+github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
+github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
+github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
 github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
 go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
@@ -16,9 +74,30 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
 go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
+golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
+golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
+golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
+golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
+golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
+golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY=
+golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
+golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
+golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
 golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
 golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
 golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
 gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
+rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

+ 16 - 0
internal/core/plugin_manager/aws_manager/io.go

@@ -0,0 +1,16 @@
+package aws_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+
+func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
+	l := entities.NewIOListener[[]byte]()
+	return l
+}
+
+func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
+
+}
+
+func (r *AWSPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
+	return nil, nil
+}

+ 3 - 5
internal/core/plugin_manager/init.go

@@ -8,11 +8,9 @@ import (
 
 var m sync.Map
 
-func checkPluginExist(name string) (*entities.PluginRuntime, bool) {
-	if v, ok := m.Load(name); ok {
-		if plugin, ok := v.(*entities.PluginRuntime); ok {
-			return plugin, true
-		}
+func checkPluginExist(identity string) (*entities.PluginRuntime, bool) {
+	if v, ok := m.Load(identity); ok {
+		return v.(*entities.PluginRuntime), true
 	}
 
 	return nil, false

+ 4 - 6
internal/core/plugin_manager/lifetime.go

@@ -16,9 +16,8 @@ func lifetime(r entities.PluginRuntimeInterface) {
 			time.Sleep(30 * time.Second)
 			if start_failed_times == 3 {
 				log.Error(
-					"init environment failed 3 times, plugin %s:%s has been stopped",
-					r.Configuration().Name,
-					r.Configuration().Version,
+					"init environment failed 3 times, plugin %s has been stopped",
+					r.Configuration().Identity(),
 				)
 				r.Stop()
 			}
@@ -33,9 +32,8 @@ func lifetime(r entities.PluginRuntimeInterface) {
 			time.Sleep(30 * time.Second)
 			if start_failed_times == 3 {
 				log.Error(
-					"start plugin failed 3 times, plugin %s:%s has been stopped",
-					r.Configuration().Name,
-					r.Configuration().Version,
+					"start plugin failed 3 times, plugin %s has been stopped",
+					r.Configuration().Identity(),
 				)
 				r.Stop()
 			}

+ 8 - 7
internal/core/plugin_manager/local_manager/environment.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func (r *LocalPluginRuntime) InitEnvironment() error {
@@ -50,7 +51,7 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 
 	last_active_at := time.Now()
 
-	go func() {
+	routine.Submit(func() {
 		defer wg.Done()
 		// read stdout
 		buf := make([]byte, 1024)
@@ -59,12 +60,12 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 			if err != nil {
 				break
 			}
-			log.Info("installing %s:%s - %s", r.Config.Name, r.Config.Name, string(buf[:n]))
+			log.Info("installing %s - %s", r.Config.Identity(), string(buf[:n]))
 			last_active_at = time.Now()
 		}
-	}()
+	})
 
-	go func() {
+	routine.Submit(func() {
 		defer wg.Done()
 		// read stderr
 		buf := make([]byte, 1024)
@@ -83,9 +84,9 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 				last_active_at = time.Now()
 			}
 		}
-	}()
+	})
 
-	go func() {
+	routine.Submit(func() {
 		ticker := time.NewTicker(5 * time.Second)
 		defer ticker.Stop()
 		for range ticker.C {
@@ -99,7 +100,7 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 				break
 			}
 		}
-	}()
+	})
 
 	wg.Wait()
 

+ 25 - 0
internal/core/plugin_manager/local_manager/io.go

@@ -0,0 +1,25 @@
+package local_manager
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+)
+
+func (r *LocalPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
+	listener := entities.NewIOListener[[]byte]()
+	listener_id := stdio_holder.OnStdioEvent(r.io_identity, func(b []byte) {
+		listener.Write(b)
+	})
+	listener.OnClose(func() {
+		stdio_holder.RemoveStdioListener(r.io_identity, listener_id)
+	})
+	return listener
+}
+
+func (r *LocalPluginRuntime) Write(session_id string, data []byte) {
+	stdio_holder.Write(r.io_identity, data)
+}
+
+func (r *LocalPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
+	return nil, nil
+}

+ 79 - 0
internal/core/plugin_manager/local_manager/run.go

@@ -1,5 +1,84 @@
 package local_manager
 
+import (
+	"fmt"
+	"os/exec"
+	"sync"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
 func (r *LocalPluginRuntime) StartPlugin() error {
+	r.State.Status = entities.PLUGIN_RUNTIME_STATUS_LAUNCHING
+	defer func() {
+		r.io_identity = ""
+	}()
+	defer log.Info("plugin %s stopped", r.Config.Identity())
+
+	// start plugin
+	e := exec.Command("bash", "launch.sh")
+	e.Dir = r.State.RelativePath
+
+	// get writer
+	stdin, err := e.StdinPipe()
+	if err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		e.Process.Kill()
+		return fmt.Errorf("get stdin pipe failed: %s", err.Error())
+	}
+
+	stdout, err := e.StdoutPipe()
+	if err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		e.Process.Kill()
+		return fmt.Errorf("get stdout pipe failed: %s", err.Error())
+	}
+
+	stderr, err := e.StderrPipe()
+	if err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		e.Process.Kill()
+		return fmt.Errorf("get stderr pipe failed: %s", err.Error())
+	}
+
+	if err := e.Start(); err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		return err
+	}
+
+	log.Info("plugin %s started", r.Config.Identity())
+
+	stdio := stdio_holder.PutStdio(stdin, stdout, stderr)
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	// listen to plugin stdout
+	routine.Submit(func() {
+		defer wg.Done()
+		stdio.StartStdout()
+	})
+
+	err = stdio.StartStderr()
+	if err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		e.Process.Kill()
+		return err
+	}
+
+	// wait for plugin to exit
+	err = e.Wait()
+	if err != nil {
+		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		return err
+	}
+
+	wg.Wait()
+
+	// plugin has exited
+	r.State.Status = entities.PLUGIN_RUNTIME_STATUS_PENDING
 	return nil
 }

+ 2 - 0
internal/core/plugin_manager/local_manager/type.go

@@ -4,4 +4,6 @@ import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 
 type LocalPluginRuntime struct {
 	entities.PluginRuntime
+
+	io_identity string
 }

+ 0 - 108
internal/core/plugin_manager/stdio_holder.go

@@ -1,108 +0,0 @@
-package plugin_manager
-
-import (
-	"io"
-	"sync"
-
-	"github.com/google/uuid"
-)
-
-var (
-	stdio_holder sync.Map               = sync.Map{}
-	listeners    []func(string, []byte) = []func(string, []byte){}
-)
-
-type stdioHolder struct {
-	id       string
-	writer   io.WriteCloser
-	reader   io.ReadCloser
-	listener []func(data []byte)
-	started  bool
-	alive    bool
-}
-
-func (s *stdioHolder) Stop() {
-	s.alive = false
-	s.writer.Close()
-	s.reader.Close()
-}
-
-func (s *stdioHolder) Start() {
-	s.started = true
-
-	go func() {
-		s.alive = true
-		for s.alive {
-			buf := make([]byte, 1024)
-			n, err := s.reader.Read(buf)
-			if err != nil {
-				s.Stop()
-				break
-			}
-
-			for _, listener := range listeners {
-				listener(s.id, buf[:n])
-			}
-
-			for _, listener := range s.listener {
-				listener(buf[:n])
-			}
-		}
-	}()
-}
-
-func PutStdio(writer io.WriteCloser, reader io.ReadCloser) string {
-	id := uuid.New().String()
-
-	holder := &stdioHolder{
-		writer: writer,
-		reader: reader,
-		id:     id,
-	}
-
-	stdio_holder.Store(id, holder)
-
-	holder.Start()
-
-	return id
-}
-
-/*
- * @param id: string
- */
-func RemoveStdio(id string) {
-	stdio_holder.Delete(id)
-}
-
-/*
- * @param listener: func(data []byte)
- */
-func OnStdioEvent(id string, listener func([]byte)) {
-	if v, ok := stdio_holder.Load(id); ok {
-		if holder, ok := v.(*stdioHolder); ok {
-			holder.listener = append(holder.listener, listener)
-		}
-	}
-}
-
-/*
- * @param listener: func(id string, data []byte)
- */
-func OnStdioEventGlobal(listener func(string, []byte)) {
-	listeners = append(listeners, listener)
-}
-
-/*
- * @param id: string
- * @param data: []byte
- */
-func Write(id string, data []byte) error {
-	if v, ok := stdio_holder.Load(id); ok {
-		if holder, ok := v.(*stdioHolder); ok {
-			_, err := holder.writer.Write(data)
-			return err
-		}
-	}
-
-	return nil
-}

+ 175 - 0
internal/core/plugin_manager/stdio_holder/io.go

@@ -0,0 +1,175 @@
+package stdio_holder
+
+import (
+	"fmt"
+	"io"
+	"sync"
+
+	"github.com/google/uuid"
+)
+
+var (
+	stdio_holder sync.Map                        = sync.Map{}
+	l            *sync.Mutex                     = &sync.Mutex{}
+	listeners    map[string]func(string, []byte) = map[string]func(string, []byte){}
+)
+
+type stdioHolder struct {
+	id        string
+	writer    io.WriteCloser
+	reader    io.ReadCloser
+	errReader io.ReadCloser
+	l         *sync.Mutex
+	listener  map[string]func([]byte)
+	started   bool
+	alive     bool
+}
+
+func (s *stdioHolder) Stop() {
+	s.alive = false
+	s.writer.Close()
+	s.reader.Close()
+	s.errReader.Close()
+}
+
+func (s *stdioHolder) StartStdout() {
+	s.started = true
+	s.alive = true
+	for s.alive {
+		buf := make([]byte, 1024)
+		n, err := s.reader.Read(buf)
+		if err != nil {
+			s.Stop()
+			break
+		}
+
+		for _, listener := range listeners {
+			listener(s.id, buf[:n])
+		}
+
+		for _, listener := range s.listener {
+			listener(buf[:n])
+		}
+	}
+}
+
+func (s *stdioHolder) StartStderr() error {
+	s.started = true
+	s.alive = true
+	defer s.Stop()
+	for s.alive {
+		buf := make([]byte, 1024)
+		n, err := s.errReader.Read(buf)
+		if err != nil && err != io.EOF {
+			return err
+		} else if err != nil {
+			return nil
+		}
+
+		if n > 0 {
+			return fmt.Errorf("stderr: %s", buf[:n])
+		}
+	}
+
+	return nil
+}
+
+/*
+ * @param writer: io.WriteCloser
+ * @param reader: io.ReadCloser
+ */
+func PutStdio(writer io.WriteCloser, reader io.ReadCloser, errReader io.ReadCloser) *stdioHolder {
+	id := uuid.New().String()
+
+	holder := &stdioHolder{
+		writer:    writer,
+		reader:    reader,
+		errReader: errReader,
+		id:        id,
+		l:         &sync.Mutex{},
+	}
+
+	stdio_holder.Store(id, holder)
+	return holder
+}
+
+/*
+ * @param id: string
+ */
+func GetStdio(id string) *stdioHolder {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			return holder
+		}
+	}
+
+	return nil
+}
+
+/*
+ * @param id: string
+ */
+func RemoveStdio(id string) {
+	stdio_holder.Delete(id)
+}
+
+/*
+ * @param id: string
+ * @param listener: func(data []byte)
+ * @return string - listener identity
+ */
+func OnStdioEvent(id string, listener func([]byte)) string {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			holder.l.Lock()
+			defer holder.l.Unlock()
+			if holder.listener == nil {
+				holder.listener = map[string]func([]byte){}
+			}
+
+			identity := uuid.New().String()
+			holder.listener[identity] = listener
+			return identity
+		}
+	}
+
+	return ""
+}
+
+/*
+ * @param id: string
+ * @param listener: string
+ */
+func RemoveStdioListener(id string, listener string) {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			holder.l.Lock()
+			defer holder.l.Unlock()
+			delete(holder.listener, listener)
+		}
+	}
+}
+
+/*
+ * @param listener: func(id string, data []byte)
+ */
+func OnStdioEventGlobal(listener func(string, []byte)) {
+	l.Lock()
+	defer l.Unlock()
+	listeners[uuid.New().String()] = listener
+}
+
+/*
+ * @param id: string
+ * @param data: []byte
+ */
+func Write(id string, data []byte) error {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			_, err := holder.writer.Write(data)
+			return err
+		}
+	}
+
+	return nil
+}

+ 28 - 17
internal/core/plugin_manager/watcher.go

@@ -3,6 +3,7 @@ package plugin_manager
 import (
 	"os"
 	"path"
+	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
@@ -10,9 +11,20 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func startWatcher(path string, platform string) {
+	go func() {
+		log.Info("start to handle new plugins in path: %s", path)
+		handleNewPlugins(path, platform)
+		for range time.NewTicker(time.Second * 30).C {
+			handleNewPlugins(path, platform)
+		}
+	}()
+}
+
+func handleNewPlugins(path string, platform string) {
 	// load local plugins firstly
 	for plugin := range loadNewPlugins(path) {
 		var plugin_interface entities.PluginRuntimeInterface
@@ -30,10 +42,13 @@ func startWatcher(path string, platform string) {
 			continue
 		}
 
-		log.Info("loaded plugin: %s:%s", plugin.Config.Name, plugin.Config.Version)
-		m.Store(plugin.Config.Name, plugin_interface)
+		log.Info("loaded plugin: %s", plugin.Config.Identity())
+
+		m.Store(plugin.Config.Identity(), &plugin)
 
-		lifetime(plugin_interface)
+		routine.Submit(func() {
+			lifetime(plugin_interface)
+		})
 	}
 }
 
@@ -48,11 +63,9 @@ func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 		return ch
 	}
 
-	go func() {
+	routine.Submit(func() {
 		for _, plugin := range plugins {
 			if plugin.IsDir() {
-				log.Info("found new plugin path: %s", plugin.Name())
-
 				configuration_path := path.Join(root_path, plugin.Name(), "manifest.json")
 				configuration, err := parsePluginConfig(configuration_path)
 				if err != nil {
@@ -60,22 +73,23 @@ func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 					continue
 				}
 
-				status := verifyPluginStatus(configuration)
-				if status.exist && status.alive {
+				if err := configuration.Validate(); err != nil {
+					log.Error("plugin %s config validate error: %v", configuration.Name, err)
 					continue
-				} else if status.exist && !status.alive {
-					log.Warn("plugin %s is not alive")
+				}
+
+				status := verifyPluginStatus(configuration)
+				if status.exist {
 					continue
 				}
 
 				ch <- entities.PluginRuntime{
 					Config: *configuration,
 					State: entities.PluginRuntimeState{
+						Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
 						Restarts:     0,
-						Active:       false,
 						RelativePath: path.Join(root_path, plugin.Name()),
 						ActiveAt:     nil,
-						DeadAt:       nil,
 						Verified:     false,
 					},
 				}
@@ -83,7 +97,7 @@ func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 		}
 
 		close(ch)
-	}()
+	})
 
 	return ch
 }
@@ -104,20 +118,17 @@ func parsePluginConfig(configuration_path string) (*entities.PluginConfiguration
 
 type pluginStatusResult struct {
 	exist bool
-	alive bool
 }
 
 func verifyPluginStatus(config *entities.PluginConfiguration) pluginStatusResult {
-	r, exist := checkPluginExist(config.Name)
+	_, exist := checkPluginExist(config.Identity())
 	if exist {
 		return pluginStatusResult{
 			exist: true,
-			alive: r.State.Active,
 		}
 	}
 
 	return pluginStatusResult{
 		exist: false,
-		alive: false,
 	}
 }

+ 11 - 0
internal/server/controller.go

@@ -0,0 +1,11 @@
+package server
+
+import "github.com/gin-gonic/gin"
+
+func InvokePlugin(c *gin.Context) {
+
+}
+
+func HealthCheck(c *gin.Context) {
+	c.JSON(200, gin.H{"status": "ok"})
+}

+ 17 - 0
internal/server/http.go

@@ -0,0 +1,17 @@
+package server
+
+import (
+	"fmt"
+
+	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+)
+
+func server(config *app.Config) {
+	engine := gin.Default()
+
+	engine.GET("/health/check", HealthCheck)
+	engine.POST("/plugin/invoke", CheckingKey(config.DifyCallingKey), InvokePlugin)
+
+	engine.Run(fmt.Sprintf(":%d", config.DifyCallingPort))
+}

+ 16 - 0
internal/server/middleware.go

@@ -0,0 +1,16 @@
+package server
+
+import "github.com/gin-gonic/gin"
+
+func CheckingKey(key string) gin.HandlerFunc {
+	return func(c *gin.Context) {
+		// get header X-Api-Key
+		if c.GetHeader("X-Api-Key") != key {
+			c.JSON(401, gin.H{"error": "Unauthorized"})
+			c.Abort()
+			return
+		}
+
+		c.Next()
+	}
+}

+ 0 - 1
internal/server/pubsub.go

@@ -1 +0,0 @@
-package server

+ 7 - 0
internal/server/server.go

@@ -3,9 +3,16 @@ package server
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func Run(config *app.Config) {
+	// init routine pool
+	routine.InitPool(config.RoutinePoolSize)
+
 	// init plugin daemon
 	plugin_manager.Init(config)
+
+	// start http server
+	server(config)
 }

+ 10 - 4
internal/types/app/config.go

@@ -1,12 +1,18 @@
 package app
 
 type Config struct {
-	DifyPluginHost string `envconfig:"DIFY_PLUGIN_HOST"`
-	DifyPluginPort int16  `envconfig:"DIFY_PLUGIN_PORT"`
-	DifyPluginKey  string `envconfig:"DIFY_PLUGIN_KEY"`
-	StoragePath    string `envconfig:"STORAGE_PATH"`
+	DifyURL         string `envconfig:"DIFY_URL"`
+	DifyCallingKey  string `envconfig:"DIFY_CALLING_KEY"`
+	DifyCallingPort int16  `envconfig:"DIFY_CALLING_PORT"`
+
+	PluginHost string `envconfig:"PLUGIN_HOST"`
+	PluginPort int16  `envconfig:"PLUGIN_PORT"`
+
+	StoragePath string `envconfig:"STORAGE_PATH"`
 
 	Platform string `envconfig:"PLATFORM"`
+
+	RoutinePoolSize int `envconfig:"ROUTINE_POOL_SIZE"`
 }
 
 const (

+ 7 - 1
internal/types/entities/config.go

@@ -1,15 +1,21 @@
 package entities
 
+import "fmt"
+
 type PluginConfiguration struct {
 	Version  string                      `json:"version"`
 	Author   string                      `json:"author"`
 	Name     string                      `json:"name"`
 	Datetime int64                       `json:"datetime"`
-	Exec     string                      `json:"exec"`
+	Module   string                      `json:"module"`
 	Resource PluginConfigurationResource `json:"resource"`
 	Meta     PluginConfigurationMeta     `json:"meta"`
 }
 
+func (p *PluginConfiguration) Identity() string {
+	return fmt.Sprintf("%s:%s", p.Name, p.Version)
+}
+
 type PluginConfigurationResource struct {
 	Memory     int64                         `json:"memory"`
 	Storage    int64                         `json:"storage"`

+ 45 - 0
internal/types/entities/listener.go

@@ -0,0 +1,45 @@
+package entities
+
+import "sync/atomic"
+
+type IOListener[T any] struct {
+	msg        chan T
+	closed     *int32
+	close_hook []func()
+}
+
+type BytesIOListener = IOListener[[]byte]
+
+func NewIOListener[T any]() *IOListener[T] {
+	return &IOListener[T]{
+		msg:        make(chan T),
+		closed:     new(int32),
+		close_hook: []func(){},
+	}
+}
+
+func (r *IOListener[T]) Listen() <-chan T {
+	return r.msg
+}
+
+func (r *IOListener[T]) Close() {
+	if !atomic.CompareAndSwapInt32(r.closed, 0, 1) {
+		return
+	}
+	atomic.StoreInt32(r.closed, 1)
+	for _, hook := range r.close_hook {
+		hook()
+	}
+	close(r.msg)
+}
+
+func (r *IOListener[T]) Write(data T) {
+	if atomic.LoadInt32(r.closed) == 1 {
+		return
+	}
+	r.msg <- data
+}
+
+func (r *IOListener[T]) OnClose(hook func()) {
+	r.close_hook = append(r.close_hook, hook)
+}

+ 23 - 5
internal/types/entities/runtime.go

@@ -12,20 +12,31 @@ type (
 	}
 
 	PluginRuntimeInterface interface {
+		PluginRuntimeTimeLifeInterface
+		PluginRuntimeSessionIOInterface
+	}
+
+	PluginRuntimeTimeLifeInterface interface {
 		InitEnvironment() error
 		StartPlugin() error
 		Stopped() bool
 		Stop()
 		Configuration() *PluginConfiguration
 	}
+
+	PluginRuntimeSessionIOInterface interface {
+		Listen(session_id string) *BytesIOListener
+		Write(session_id string, data []byte)
+		Request(session_id string, data []byte) ([]byte, error)
+	}
 )
 
 func (r *PluginRuntime) Stopped() bool {
-	return r.State.Stopped
+	return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
 }
 
 func (r *PluginRuntime) Stop() {
-	r.State.Stopped = true
+	r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
 }
 
 func (r *PluginRuntime) Configuration() *PluginConfiguration {
@@ -34,14 +45,21 @@ func (r *PluginRuntime) Configuration() *PluginConfiguration {
 
 type PluginRuntimeState struct {
 	Restarts     int        `json:"restarts"`
-	Active       bool       `json:"active"`
+	Status       string     `json:"status"`
 	RelativePath string     `json:"relative_path"`
 	ActiveAt     *time.Time `json:"active_at"`
-	DeadAt       *time.Time `json:"dead_at"`
-	Stopped      bool       `json:"stopped"`
+	StoppedAt    *time.Time `json:"stopped_at"`
 	Verified     bool       `json:"verified"`
 }
 
+const (
+	PLUGIN_RUNTIME_STATUS_ACTIVE     = "active"
+	PLUGIN_RUNTIME_STATUS_LAUNCHING  = "launching"
+	PLUGIN_RUNTIME_STATUS_STOPPED    = "stopped"
+	PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
+	PLUGIN_RUNTIME_STATUS_PENDING    = "pending"
+)
+
 type PluginConnector interface {
 	OnMessage(func([]byte))
 	Read([]byte) int

+ 19 - 0
internal/types/entities/validate.go

@@ -0,0 +1,19 @@
+package entities
+
+import "errors"
+
+func (c *PluginConfiguration) Validate() error {
+	if c.Module == "" {
+		return errors.New("exec is required")
+	}
+
+	if c.Name == "" {
+		return errors.New("name is required")
+	}
+
+	if c.Version == "" {
+		return errors.New("version is required")
+	}
+
+	return nil
+}

+ 19 - 0
internal/utils/routine/pool.go

@@ -0,0 +1,19 @@
+package routine
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/panjf2000/ants"
+)
+
+var (
+	p *ants.Pool
+)
+
+func InitPool(size int) {
+	log.Info("init routine pool, size: %d", size)
+	p, _ = ants.NewPool(size, ants.WithNonblocking(false))
+}
+
+func Submit(f func()) {
+	p.Submit(f)
+}