123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- package plugin_entities
- import (
- "bytes"
- "crypto/sha256"
- "encoding/gob"
- "encoding/hex"
- "fmt"
- "hash/fnv"
- "time"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities"
- )
- type (
- PluginRuntime struct {
- State PluginRuntimeState `json:"state"`
- Config PluginDeclaration `json:"config"`
- onStopped []func() `json:"-"`
- }
- PluginLifetime interface {
- PluginBasicInfoInterface
- PluginRuntimeSessionIOInterface
- PluginClusterLifetime
- }
- PluginFullDuplexLifetime interface {
- PluginLifetime
- // before the plugin starts, it will call this method to initialize the environment
- InitEnvironment() error
- // start the plugin, returns errors if the plugin fails to start and hangs until the plugin stops
- StartPlugin() error
- // wait for the plugin to stop
- Wait() (<-chan bool, error)
- // Cleanup the plugin runtime
- Cleanup()
- // set the plugin to active
- SetActive()
- // set the plugin to launching
- SetLaunching()
- // set the plugin to restarting
- SetRestarting()
- // set the plugin to pending
- SetPending()
- // set the active time of the plugin
- SetActiveAt(t time.Time)
- // set the scheduled time of the plugin
- SetScheduledAt(t time.Time)
- // add restarts to the plugin
- AddRestarts()
- // Started
- WaitStarted() <-chan bool
- // Stopped
- WaitStopped() <-chan bool
- }
- PluginServerlessLifetime interface {
- PluginLifetime
- // before the plugin starts, it will call this method to initialize the environment
- InitEnvironment() error
- // UploadPlugin uploads the plugin to the AWS Lambda
- UploadPlugin() error
- }
- PluginRuntimeSessionIOInterface interface {
- PluginBasicInfoInterface
- // Listen listens for messages from the plugin
- Listen(session_id string) *entities.Broadcast[SessionMessage]
- // Write writes a message to the plugin
- Write(session_id string, data []byte)
- // Log adds a log to the plugin runtime state
- Log(string)
- // Warn adds a warning to the plugin runtime state
- Warn(string)
- // Error adds an error to the plugin runtime state
- Error(string)
- }
- PluginClusterLifetime interface {
- // stop the plugin
- Stop()
- // add a function to be called when the plugin stops
- OnStop(func())
- // trigger the stop event
- TriggerStop()
- // returns true if the plugin is stopped
- Stopped() bool
- // returns the runtime state of the plugin
- RuntimeState() PluginRuntimeState
- // Update the runtime state of the plugin
- UpdateScheduledAt(t time.Time)
- }
- PluginBasicInfoInterface interface {
- // returns the runtime type of the plugin
- Type() PluginRuntimeType
- // returns the plugin configuration
- Configuration() *PluginDeclaration
- // unique identity of the plugin
- Identity() (PluginUniqueIdentifier, error)
- // hashed identity of the plugin
- HashedIdentity() (string, error)
- // returns the checksum of the plugin
- Checksum() (string, error)
- }
- )
- func (r *PluginRuntime) Stopped() bool {
- return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
- }
- func (r *PluginRuntime) Stop() {
- r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
- }
- func (r *PluginRuntime) Configuration() *PluginDeclaration {
- return &r.Config
- }
- func HashedIdentity(identity string) string {
- hash := sha256.New()
- hash.Write([]byte(identity))
- return hex.EncodeToString(hash.Sum(nil))
- }
- func (r *PluginRuntime) HashedIdentity() (string, error) {
- return HashedIdentity(r.Config.Identity()), nil
- }
- func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
- return r.State
- }
- func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
- r.State.ScheduledAt = &t
- }
- func (r *PluginRuntime) InitState() {
- r.State = PluginRuntimeState{
- Restarts: 0,
- Status: PLUGIN_RUNTIME_STATUS_PENDING,
- ActiveAt: nil,
- StoppedAt: nil,
- Verified: false,
- ScheduledAt: nil,
- Logs: []string{},
- }
- }
- func (r *PluginRuntime) SetActive() {
- r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
- }
- func (r *PluginRuntime) SetLaunching() {
- r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
- }
- func (r *PluginRuntime) SetRestarting() {
- r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
- }
- func (r *PluginRuntime) SetPending() {
- r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
- }
- func (r *PluginRuntime) SetActiveAt(t time.Time) {
- r.State.ActiveAt = &t
- }
- func (r *PluginRuntime) SetScheduledAt(t time.Time) {
- r.State.ScheduledAt = &t
- }
- func (r *PluginRuntime) AddRestarts() {
- r.State.Restarts++
- }
- func (r *PluginRuntime) OnStop(f func()) {
- r.onStopped = append(r.onStopped, f)
- }
- func (r *PluginRuntime) TriggerStop() {
- for _, f := range r.onStopped {
- f()
- }
- }
- func (s *PluginRuntime) Log(log string) {
- s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Info] %s: %s", time.Now().Format(time.RFC3339), log))
- }
- func (s *PluginRuntime) Warn(log string) {
- s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Warn] %s: %s", time.Now().Format(time.RFC3339), log))
- }
- func (s *PluginRuntime) Error(log string) {
- s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Error] %s: %s", time.Now().Format(time.RFC3339), log))
- }
- type PluginRuntimeType string
- const (
- PLUGIN_RUNTIME_TYPE_LOCAL PluginRuntimeType = "local"
- PLUGIN_RUNTIME_TYPE_REMOTE PluginRuntimeType = "remote"
- PLUGIN_RUNTIME_TYPE_SERVERLESS PluginRuntimeType = "serverless"
- )
- type PluginRuntimeState struct {
- Restarts int `json:"restarts"`
- Status string `json:"status"`
- WorkingPath string `json:"working_path"`
- ActiveAt *time.Time `json:"active_at"`
- StoppedAt *time.Time `json:"stopped_at"`
- Verified bool `json:"verified"`
- ScheduledAt *time.Time `json:"scheduled_at"`
- Logs []string `json:"logs"`
- }
- func (s *PluginRuntimeState) Hash() (uint64, error) {
- buf := bytes.Buffer{}
- enc := gob.NewEncoder(&buf)
- err := enc.Encode(s)
- if err != nil {
- return 0, err
- }
- j := fnv.New64a()
- _, err = j.Write(buf.Bytes())
- if err != nil {
- return 0, err
- }
- return j.Sum64(), nil
- }
- const (
- PLUGIN_RUNTIME_STATUS_ACTIVE = "active"
- PLUGIN_RUNTIME_STATUS_LAUNCHING = "launching"
- PLUGIN_RUNTIME_STATUS_STOPPED = "stopped"
- PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
- PLUGIN_RUNTIME_STATUS_PENDING = "pending"
- )
|