runtime.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package plugin_entities
  2. import (
  3. "bytes"
  4. "crypto/sha256"
  5. "encoding/gob"
  6. "encoding/hex"
  7. "fmt"
  8. "hash/fnv"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  11. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  12. )
  13. type (
  14. PluginRuntime struct {
  15. State PluginRuntimeState `json:"state"`
  16. Config PluginDeclaration `json:"config"`
  17. onStopped []func() `json:"-"`
  18. }
  19. PluginLifetime interface {
  20. PluginBasicInfoInterface
  21. PluginRuntimeSessionIOInterface
  22. PluginClusterLifetime
  23. }
  24. PluginFullDuplexLifetime interface {
  25. PluginLifetime
  26. // before the plugin starts, it will call this method to initialize the environment
  27. InitEnvironment() error
  28. // start the plugin, returns errors if the plugin fails to start and hangs until the plugin stops
  29. StartPlugin() error
  30. // wait for the plugin to stop
  31. Wait() (<-chan bool, error)
  32. // Cleanup the plugin runtime
  33. Cleanup()
  34. // set the plugin to active
  35. SetActive()
  36. // set the plugin to launching
  37. SetLaunching()
  38. // set the plugin to restarting
  39. SetRestarting()
  40. // set the plugin to pending
  41. SetPending()
  42. // set the active time of the plugin
  43. SetActiveAt(t time.Time)
  44. // set the scheduled time of the plugin
  45. SetScheduledAt(t time.Time)
  46. // add restarts to the plugin
  47. AddRestarts()
  48. // Started
  49. WaitStarted() <-chan bool
  50. // Stopped
  51. WaitStopped() <-chan bool
  52. }
  53. PluginServerlessLifetime interface {
  54. PluginLifetime
  55. // before the plugin starts, it will call this method to initialize the environment
  56. InitEnvironment() error
  57. // UploadPlugin uploads the plugin to the AWS Lambda
  58. UploadPlugin() error
  59. }
  60. PluginRuntimeSessionIOInterface interface {
  61. PluginBasicInfoInterface
  62. // Listen listens for messages from the plugin
  63. Listen(session_id string) *entities.Broadcast[SessionMessage]
  64. // Write writes a message to the plugin
  65. Write(session_id string, action access_types.PluginAccessAction, data []byte)
  66. // Log adds a log to the plugin runtime state
  67. Log(string)
  68. // Warn adds a warning to the plugin runtime state
  69. Warn(string)
  70. // Error adds an error to the plugin runtime state
  71. Error(string)
  72. }
  73. PluginClusterLifetime interface {
  74. // stop the plugin
  75. Stop()
  76. // add a function to be called when the plugin stops
  77. OnStop(func())
  78. // trigger the stop event
  79. TriggerStop()
  80. // returns true if the plugin is stopped
  81. Stopped() bool
  82. // returns the runtime state of the plugin
  83. RuntimeState() PluginRuntimeState
  84. // Update the runtime state of the plugin
  85. UpdateScheduledAt(t time.Time)
  86. }
  87. PluginBasicInfoInterface interface {
  88. // returns the runtime type of the plugin
  89. Type() PluginRuntimeType
  90. // returns the plugin configuration
  91. Configuration() *PluginDeclaration
  92. // unique identity of the plugin
  93. Identity() (PluginUniqueIdentifier, error)
  94. // hashed identity of the plugin
  95. HashedIdentity() (string, error)
  96. // returns the checksum of the plugin
  97. Checksum() (string, error)
  98. }
  99. )
  100. func (r *PluginRuntime) Stopped() bool {
  101. return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
  102. }
  103. func (r *PluginRuntime) Stop() {
  104. r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
  105. }
  106. func (r *PluginRuntime) Configuration() *PluginDeclaration {
  107. return &r.Config
  108. }
  109. func HashedIdentity(identity string) string {
  110. hash := sha256.New()
  111. hash.Write([]byte(identity))
  112. return hex.EncodeToString(hash.Sum(nil))
  113. }
  114. func (r *PluginRuntime) HashedIdentity() (string, error) {
  115. return HashedIdentity(r.Config.Identity()), nil
  116. }
  117. func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
  118. return r.State
  119. }
  120. func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
  121. r.State.ScheduledAt = &t
  122. }
  123. func (r *PluginRuntime) InitState() {
  124. r.State = PluginRuntimeState{
  125. Restarts: 0,
  126. Status: PLUGIN_RUNTIME_STATUS_PENDING,
  127. ActiveAt: nil,
  128. StoppedAt: nil,
  129. Verified: false,
  130. ScheduledAt: nil,
  131. Logs: []string{},
  132. }
  133. }
  134. func (r *PluginRuntime) SetActive() {
  135. r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
  136. }
  137. func (r *PluginRuntime) SetLaunching() {
  138. r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
  139. }
  140. func (r *PluginRuntime) SetRestarting() {
  141. r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
  142. }
  143. func (r *PluginRuntime) SetPending() {
  144. r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
  145. }
  146. func (r *PluginRuntime) SetActiveAt(t time.Time) {
  147. r.State.ActiveAt = &t
  148. }
  149. func (r *PluginRuntime) SetScheduledAt(t time.Time) {
  150. r.State.ScheduledAt = &t
  151. }
  152. func (r *PluginRuntime) AddRestarts() {
  153. r.State.Restarts++
  154. }
  155. func (r *PluginRuntime) OnStop(f func()) {
  156. r.onStopped = append(r.onStopped, f)
  157. }
  158. func (r *PluginRuntime) TriggerStop() {
  159. for _, f := range r.onStopped {
  160. f()
  161. }
  162. }
  163. func (s *PluginRuntime) Log(log string) {
  164. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Info] %s: %s", time.Now().Format(time.RFC3339), log))
  165. }
  166. func (s *PluginRuntime) Warn(log string) {
  167. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Warn] %s: %s", time.Now().Format(time.RFC3339), log))
  168. }
  169. func (s *PluginRuntime) Error(log string) {
  170. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Error] %s: %s", time.Now().Format(time.RFC3339), log))
  171. }
  172. type PluginRuntimeType string
  173. const (
  174. PLUGIN_RUNTIME_TYPE_LOCAL PluginRuntimeType = "local"
  175. PLUGIN_RUNTIME_TYPE_REMOTE PluginRuntimeType = "remote"
  176. PLUGIN_RUNTIME_TYPE_SERVERLESS PluginRuntimeType = "serverless"
  177. )
  178. type PluginRuntimeState struct {
  179. Restarts int `json:"restarts"`
  180. Status string `json:"status"`
  181. WorkingPath string `json:"working_path"`
  182. ActiveAt *time.Time `json:"active_at"`
  183. StoppedAt *time.Time `json:"stopped_at"`
  184. Verified bool `json:"verified"`
  185. ScheduledAt *time.Time `json:"scheduled_at"`
  186. Logs []string `json:"logs"`
  187. }
  188. func (s *PluginRuntimeState) Hash() (uint64, error) {
  189. buf := bytes.Buffer{}
  190. enc := gob.NewEncoder(&buf)
  191. err := enc.Encode(s)
  192. if err != nil {
  193. return 0, err
  194. }
  195. j := fnv.New64a()
  196. _, err = j.Write(buf.Bytes())
  197. if err != nil {
  198. return 0, err
  199. }
  200. return j.Sum64(), nil
  201. }
  202. const (
  203. PLUGIN_RUNTIME_STATUS_ACTIVE = "active"
  204. PLUGIN_RUNTIME_STATUS_LAUNCHING = "launching"
  205. PLUGIN_RUNTIME_STATUS_STOPPED = "stopped"
  206. PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
  207. PLUGIN_RUNTIME_STATUS_PENDING = "pending"
  208. )