Yeuoly 1 år sedan
förälder
incheckning
3fc489f4d3

+ 7 - 0
Dockerfile

@@ -0,0 +1,7 @@
+FROM ubuntu:22.04
+
+# Install conda
+RUN apt-get update && apt-get install -y wget && \
+    wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh && \
+    bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /miniconda3 && \
+    rm Miniconda3-py39_4.10.3-Linux-x86_64.sh

+ 1 - 0
cmd/packer/main.go

@@ -0,0 +1 @@
+package main

+ 2 - 2
cmd/server/main.go

@@ -3,7 +3,7 @@ package main
 import (
 	"github.com/joho/godotenv"
 	"github.com/kelseyhightower/envconfig"
-	"github.com/langgenius/dify-plugin-daemon/internal/daemon"
+	"github.com/langgenius/dify-plugin-daemon/internal/server"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
@@ -21,5 +21,5 @@ func main() {
 		log.Panic("Error processing environment variables")
 	}
 
-	daemon.Run(&config)
+	server.Run(&config)
 }

+ 33 - 0
cmd/test/main.go

@@ -0,0 +1,33 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+func main() {
+	ch := c()
+	for i := range ch {
+		if i == 20 {
+			break
+		}
+		fmt.Println(i)
+	}
+
+	for {
+		time.Sleep(1 * time.Second)
+	}
+}
+
+func c() <-chan int {
+	c := make(chan int)
+	go func() {
+		for i := 0; i < 10000; i++ {
+			fmt.Println("send", i)
+			c <- i
+		}
+
+		close(c)
+	}()
+	return c
+}

+ 3 - 0
examples/baisc_math/.gitignore

@@ -0,0 +1,3 @@
+__pycache__
+install.sh
+.installed

+ 11 - 0
examples/baisc_math/main.py

@@ -0,0 +1,11 @@
+def main():
+    while True:
+        try:
+            x = int(input('Enter a number: '))
+            y = int(input('Enter another number: '))
+            print(f'{x} + {y} = {x + y}')
+        except ValueError:
+            print('Invalid input. Please enter a number.')
+
+if __name__ == '__main__':
+    main()

+ 28 - 0
examples/baisc_math/manifest.json

@@ -0,0 +1,28 @@
+{
+    "version": "0.0.1",
+    "author": "Yeuoly",
+    "name": "basic_math",
+    "datetime": 1719812022,
+    "exec": ".venv/bin/python -m main",
+    "resource": {
+        "memory": 1048576,
+        "storage": 1048576,
+        "permission": {
+            "tool": {
+                "enabled": true
+            },
+            "model": {
+                "enabled": true,
+                "llm": true
+            }
+        }
+    },
+    "meta": {
+        "version": "0.0.1",
+        "arch": ["amd64", "arm64"],
+        "runner" : {
+            "language": "python",
+            "version": "3.12"
+        }
+    }
+}

+ 1 - 0
examples/baisc_math/requirements.txt

@@ -0,0 +1 @@
+requests==2.28.1

+ 10 - 0
go.mod

@@ -2,7 +2,17 @@ module github.com/langgenius/dify-plugin-daemon
 
 go 1.20
 
+require github.com/google/uuid v1.6.0
+
 require (
 	github.com/joho/godotenv v1.5.1 // indirect
 	github.com/kelseyhightower/envconfig v1.4.0 // indirect
+	github.com/panjf2000/gnet/v2 v2.5.5 // indirect
+	github.com/valyala/bytebufferpool v1.0.0 // indirect
+	go.uber.org/atomic v1.11.0 // indirect
+	go.uber.org/multierr v1.11.0 // indirect
+	go.uber.org/zap v1.27.0 // indirect
+	golang.org/x/sync v0.7.0 // indirect
+	golang.org/x/sys v0.21.0 // indirect
+	gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
 )

+ 20 - 0
go.sum

@@ -1,4 +1,24 @@
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
 github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
 github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
+github.com/panjf2000/gnet/v2 v2.5.5 h1:H+LqGgCHs2mGJq/4n6YELhMjZ027bNgd5Qb8Wj5nbrM=
+github.com/panjf2000/gnet/v2 v2.5.5/go.mod h1:ppopMJ8VrDbJu8kDsqFQTgNmpMS8Le5CmPxISf+Sauk=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
+go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
+go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
+go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
+go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
+go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
+golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=

+ 1 - 0
internal/core/plugin_daemon/model.go

@@ -0,0 +1 @@
+package plugin_daemon

+ 1 - 0
internal/core/plugin_daemon/tool.go

@@ -0,0 +1 @@
+package plugin_daemon

+ 1 - 0
internal/core/plugin_manager/aws_connector.go

@@ -0,0 +1 @@
+package plugin_manager

+ 18 - 0
internal/core/plugin_manager/init.go

@@ -1 +1,19 @@
 package plugin_manager
+
+import (
+	"sync"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+)
+
+var m sync.Map
+
+func checkPluginExist(name string) (*entities.PluginRuntime, bool) {
+	if v, ok := m.Load(name); ok {
+		if plugin, ok := v.(*entities.PluginRuntime); ok {
+			return plugin, true
+		}
+	}
+
+	return nil, false
+}

+ 7 - 0
internal/core/plugin_manager/lifetime.go

@@ -0,0 +1,7 @@
+package plugin_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+
+func lifetime(r *entities.PluginRuntime) {
+
+}

+ 1 - 0
internal/core/plugin_manager/local_connector.go

@@ -0,0 +1 @@
+package plugin_manager

+ 33 - 0
internal/core/plugin_manager/manager.go

@@ -0,0 +1,33 @@
+package plugin_manager
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+func List() []*entities.PluginRuntime {
+	var runtimes []*entities.PluginRuntime
+	m.Range(func(key, value interface{}) bool {
+		if v, ok := value.(*entities.PluginRuntime); ok {
+			runtimes = append(runtimes, v)
+		}
+		return true
+	})
+	return runtimes
+}
+
+func Put(path string, binary []byte) {
+	//TODO: put binary into
+}
+
+func Delete(identity string) {
+	//TODO: delete binary from
+}
+
+func Init(configuration *app.Config) {
+	// TODO: init plugin manager
+	log.Info("start plugin manager daemon...")
+
+	startWatcher(configuration.StoragePath)
+}

+ 108 - 0
internal/core/plugin_manager/stdio_holder.go

@@ -0,0 +1,108 @@
+package plugin_manager
+
+import (
+	"io"
+	"sync"
+
+	"github.com/google/uuid"
+)
+
+var (
+	stdio_holder sync.Map               = sync.Map{}
+	listeners    []func(string, []byte) = []func(string, []byte){}
+)
+
+type stdioHolder struct {
+	id       string
+	writer   io.WriteCloser
+	reader   io.ReadCloser
+	listener []func(data []byte)
+	started  bool
+	alive    bool
+}
+
+func (s *stdioHolder) Stop() {
+	s.alive = false
+	s.writer.Close()
+	s.reader.Close()
+}
+
+func (s *stdioHolder) Start() {
+	s.started = true
+
+	go func() {
+		s.alive = true
+		for s.alive {
+			buf := make([]byte, 1024)
+			n, err := s.reader.Read(buf)
+			if err != nil {
+				s.Stop()
+				break
+			}
+
+			for _, listener := range listeners {
+				listener(s.id, buf[:n])
+			}
+
+			for _, listener := range s.listener {
+				listener(buf[:n])
+			}
+		}
+	}()
+}
+
+func PutStdio(writer io.WriteCloser, reader io.ReadCloser) string {
+	id := uuid.New().String()
+
+	holder := &stdioHolder{
+		writer: writer,
+		reader: reader,
+		id:     id,
+	}
+
+	stdio_holder.Store(id, holder)
+
+	holder.Start()
+
+	return id
+}
+
+/*
+ * @param id: string
+ */
+func RemoveStdio(id string) {
+	stdio_holder.Delete(id)
+}
+
+/*
+ * @param listener: func(data []byte)
+ */
+func OnStdioEvent(id string, listener func([]byte)) {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			holder.listener = append(holder.listener, listener)
+		}
+	}
+}
+
+/*
+ * @param listener: func(id string, data []byte)
+ */
+func OnStdioEventGlobal(listener func(string, []byte)) {
+	listeners = append(listeners, listener)
+}
+
+/*
+ * @param id: string
+ * @param data: []byte
+ */
+func Write(id string, data []byte) error {
+	if v, ok := stdio_holder.Load(id); ok {
+		if holder, ok := v.(*stdioHolder); ok {
+			_, err := holder.writer.Write(data)
+			return err
+		}
+	}
+
+	return nil
+}

+ 46 - 0
internal/core/plugin_manager/tcp_connector.go

@@ -0,0 +1,46 @@
+package plugin_manager
+
+import (
+	"fmt"
+
+	"github.com/panjf2000/gnet/v2"
+)
+
+type difyServer struct {
+	gnet.BuiltinEventEngine
+
+	eng       gnet.Engine
+	addr      string
+	multicore bool
+}
+
+func (s *difyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
+	c.SetContext(&codec{})
+	return nil, gnet.None
+}
+
+func (s *difyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
+	return gnet.None
+}
+
+func (s *difyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
+	codec := c.Context().(*codec)
+	messages, err := codec.Decode(c)
+	if err != nil {
+		return gnet.Close
+	}
+
+	for _, message := range messages {
+		fmt.Println(message)
+	}
+
+	return gnet.None
+}
+
+func traffic() {
+	addr := "tcp://:9000"
+	multicore := true
+	s := &difyServer{addr: addr, multicore: multicore}
+
+	gnet.Run(s, addr, gnet.WithMulticore(multicore), gnet.WithNumEventLoop(8))
+}

+ 55 - 0
internal/core/plugin_manager/tcp_protocol.go

@@ -0,0 +1,55 @@
+package plugin_manager
+
+import (
+	"bytes"
+	"errors"
+
+	"github.com/panjf2000/gnet/v2"
+)
+
+type codec struct {
+	buf bytes.Buffer
+}
+
+func (w *codec) Decode(c gnet.Conn) ([][]byte, error) {
+	size := c.InboundBuffered()
+	buf := make([]byte, size)
+	read, err := c.Read(buf)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if read < size {
+		return nil, errors.New("read less than size")
+	}
+
+	// use \ as escape character, as for \ itself, it should be escaped as well
+	var start int
+	var result [][]byte = make([][]byte, 0)
+	var current_line []byte = make([]byte, 0)
+	for i := 0; i < size; i++ {
+		if buf[i] == '\\' {
+			// write to current line
+			current_line = append(current_line, buf[start:i]...)
+			start = i + 1
+			i++
+			continue
+		}
+
+		if buf[i] == '\n' {
+			// write to current line
+			current_line = append(current_line, buf[start:i]...)
+			result = append(result, current_line)
+			current_line = make([]byte, 0)
+			start = i + 1
+		}
+	}
+
+	// for the last line, write it to buffer
+	if start < size {
+		w.buf.Write(buf[start:size])
+	}
+
+	return result, nil
+}

+ 106 - 0
internal/core/plugin_manager/watcher.go

@@ -0,0 +1,106 @@
+package plugin_manager
+
+import (
+	"os"
+	"path"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
+
+func startWatcher(path string) {
+	// load local plugins firstly
+	for plugin := range loadNewPlugins(path) {
+
+		log.Info("loaded plugin: %s:%s", plugin.Config.Name, plugin.Config.Version)
+		m.Store(plugin.Info.ID, &plugin)
+
+		lifetime(&plugin)
+	}
+}
+
+// chan should be closed after using that
+func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
+	ch := make(chan entities.PluginRuntime)
+
+	plugins, err := os.ReadDir(root_path)
+	if err != nil {
+		log.Error("no plugin found in path: %s", root_path)
+		close(ch)
+		return ch
+	}
+
+	go func() {
+		for _, plugin := range plugins {
+			if plugin.IsDir() {
+				log.Info("found new plugin path: %s", plugin.Name())
+
+				configuration_path := path.Join(root_path, plugin.Name(), "manifest.json")
+				configuration, err := parsePluginConfig(configuration_path)
+				if err != nil {
+					log.Error("parse plugin config error: %v", err)
+					continue
+				}
+
+				status := verifyPluginStatus(configuration)
+				if status.exist && status.alive {
+					continue
+				} else if status.exist && !status.alive {
+					log.Warn("plugin %s is not alive")
+					continue
+				}
+
+				ch <- entities.PluginRuntime{
+					Config: *configuration,
+					State: entities.PluginRuntimeState{
+						Restarts:     0,
+						Active:       false,
+						RelativePath: path.Join(root_path, plugin.Name()),
+						ActiveAt:     nil,
+						DeadAt:       nil,
+						Verified:     false,
+					},
+				}
+			}
+		}
+
+		close(ch)
+	}()
+
+	return ch
+}
+
+func parsePluginConfig(configuration_path string) (*entities.PluginConfiguration, error) {
+	text, err := os.ReadFile(configuration_path)
+	if err != nil {
+		return nil, err
+	}
+
+	result, err := parser.UnmarshalJson[entities.PluginConfiguration](string(text))
+	if err != nil {
+		return nil, err
+	}
+
+	return &result, nil
+}
+
+type pluginStatusResult struct {
+	exist bool
+	alive bool
+}
+
+func verifyPluginStatus(config *entities.PluginConfiguration) pluginStatusResult {
+	r, exist := checkPluginExist(config.Name)
+	if exist {
+		return pluginStatusResult{
+			exist: true,
+			alive: r.State.Active,
+		}
+	}
+
+	return pluginStatusResult{
+		exist: false,
+		alive: false,
+	}
+}

+ 0 - 9
internal/daemon/server.go

@@ -1,9 +0,0 @@
-package daemon
-
-import (
-	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-)
-
-func Run(config *app.Config) {
-
-}

+ 1 - 0
internal/server/pubsub.go

@@ -0,0 +1 @@
+package server

+ 11 - 0
internal/server/server.go

@@ -0,0 +1,11 @@
+package server
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+)
+
+func Run(config *app.Config) {
+	// init plugin daemon
+	plugin_manager.Init(config)
+}

+ 1 - 0
internal/types/app/config.go

@@ -4,4 +4,5 @@ type Config struct {
 	DifyPluginHost string `envconfig:"DIFY_PLUGIN_HOST"`
 	DifyPluginPort int16  `envconfig:"DIFY_PLUGIN_PORT"`
 	DifyPluginKey  string `envconfig:"DIFY_PLUGIN_KEY"`
+	StoragePath    string `envconfig:"STORAGE_PATH"`
 }

+ 75 - 0
internal/types/entities/runtime.go

@@ -0,0 +1,75 @@
+package entities
+
+import (
+	"time"
+)
+
+const (
+	PLUGIN_RUNTIME_TYPE_LOCAL      = "local"
+	PLUGIN_RUNTIME_TYPE_AWS_LAMBDA = "aws_lambda"
+)
+
+type PluginRuntime struct {
+	Info      PluginRuntimeInfo   `json:"info"`
+	State     PluginRuntimeState  `json:"state"`
+	Config    PluginConfiguration `json:"config"`
+	Connector PluginConnector     `json:"-"`
+}
+
+type PluginRuntimeInfo struct {
+	Type    string `json:"type"`
+	ID      string `json:"id"`
+	Restart bool   `json:"restart"`
+}
+
+type PluginRuntimeState struct {
+	Restarts     int        `json:"restarts"`
+	Active       bool       `json:"active"`
+	RelativePath string     `json:"relative_path"`
+	ActiveAt     *time.Time `json:"active_at"`
+	DeadAt       *time.Time `json:"dead_at"`
+	Verified     bool       `json:"verified"`
+}
+
+type PluginConfiguration struct {
+	Version  string                      `json:"version"`
+	Author   string                      `json:"author"`
+	Name     string                      `json:"name"`
+	Datetime int64                       `json:"datetime"`
+	Resource PluginConfigurationResource `json:"resource"`
+}
+
+type PluginConfigurationResource struct {
+	Memory     int64                         `json:"memory"`
+	Storage    int64                         `json:"storage"`
+	Permission PluginConfigurationPermission `json:"permission"`
+}
+
+type PluginExtension struct {
+	Tool  bool `json:"tool"`
+	Model bool `json:"model"`
+}
+
+type PluginConfigurationPermission struct {
+	Model PluginConfigurationPermissionModel `json:"model"`
+	Tool  PluginConfigurationPermissionTool  `json:"tool"`
+}
+
+type PluginConfigurationPermissionModel struct {
+	Enabled       bool `json:"enabled"`
+	LLM           bool `json:"llm"`
+	TextEmbedding bool `json:"text_embedding"`
+	Rerank        bool `json:"rerank"`
+	TTS           bool `json:"tts"`
+	STT           bool `json:"stt"`
+}
+
+type PluginConfigurationPermissionTool struct {
+	Enabled bool `json:"enabled"`
+}
+
+type PluginConnector interface {
+	OnMessage(func([]byte))
+	Read([]byte) int
+	Write([]byte) int
+}

+ 14 - 0
internal/utils/parser/json.go

@@ -0,0 +1,14 @@
+package parser
+
+import "encoding/json"
+
+func UnmarshalJson[T any](text string) (T, error) {
+	var result T
+	err := json.Unmarshal([]byte(text), &result)
+	return result, err
+}
+
+func MarshalJson[T any](data T) string {
+	b, _ := json.Marshal(data)
+	return string(b)
+}