runtime.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package plugin_entities
  2. import (
  3. "bytes"
  4. "crypto/sha256"
  5. "encoding/gob"
  6. "encoding/hex"
  7. "fmt"
  8. "hash/fnv"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  11. )
  12. type (
  13. PluginRuntime struct {
  14. State PluginRuntimeState `json:"state"`
  15. Config PluginDeclaration `json:"config"`
  16. onStopped []func() `json:"-"`
  17. }
  18. PluginLifetime interface {
  19. PluginBasicInfoInterface
  20. PluginRuntimeSessionIOInterface
  21. PluginClusterLifetime
  22. }
  23. PluginFullDuplexLifetime interface {
  24. PluginLifetime
  25. // before the plugin starts, it will call this method to initialize the environment
  26. InitEnvironment() error
  27. // start the plugin, returns errors if the plugin fails to start and hangs until the plugin stops
  28. StartPlugin() error
  29. // wait for the plugin to stop
  30. Wait() (<-chan bool, error)
  31. // Cleanup the plugin runtime
  32. Cleanup()
  33. // set the plugin to active
  34. SetActive()
  35. // set the plugin to launching
  36. SetLaunching()
  37. // set the plugin to restarting
  38. SetRestarting()
  39. // set the plugin to pending
  40. SetPending()
  41. // set the active time of the plugin
  42. SetActiveAt(t time.Time)
  43. // set the scheduled time of the plugin
  44. SetScheduledAt(t time.Time)
  45. // add restarts to the plugin
  46. AddRestarts()
  47. // Started
  48. WaitStarted() <-chan bool
  49. // Stopped
  50. WaitStopped() <-chan bool
  51. }
  52. PluginServerlessLifetime interface {
  53. PluginLifetime
  54. // before the plugin starts, it will call this method to initialize the environment
  55. InitEnvironment() error
  56. // UploadPlugin uploads the plugin to the AWS Lambda
  57. UploadPlugin() error
  58. }
  59. PluginRuntimeSessionIOInterface interface {
  60. PluginBasicInfoInterface
  61. // Listen listens for messages from the plugin
  62. Listen(session_id string) *entities.Broadcast[SessionMessage]
  63. // Write writes a message to the plugin
  64. Write(session_id string, data []byte)
  65. // Log adds a log to the plugin runtime state
  66. Log(string)
  67. // Warn adds a warning to the plugin runtime state
  68. Warn(string)
  69. // Error adds an error to the plugin runtime state
  70. Error(string)
  71. }
  72. PluginClusterLifetime interface {
  73. // stop the plugin
  74. Stop()
  75. // add a function to be called when the plugin stops
  76. OnStop(func())
  77. // trigger the stop event
  78. TriggerStop()
  79. // returns true if the plugin is stopped
  80. Stopped() bool
  81. // returns the runtime state of the plugin
  82. RuntimeState() PluginRuntimeState
  83. // Update the runtime state of the plugin
  84. UpdateScheduledAt(t time.Time)
  85. }
  86. PluginBasicInfoInterface interface {
  87. // returns the runtime type of the plugin
  88. Type() PluginRuntimeType
  89. // returns the plugin configuration
  90. Configuration() *PluginDeclaration
  91. // unique identity of the plugin
  92. Identity() (PluginUniqueIdentifier, error)
  93. // hashed identity of the plugin
  94. HashedIdentity() (string, error)
  95. // returns the checksum of the plugin
  96. Checksum() (string, error)
  97. }
  98. )
  99. func (r *PluginRuntime) Stopped() bool {
  100. return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
  101. }
  102. func (r *PluginRuntime) Stop() {
  103. r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
  104. }
  105. func (r *PluginRuntime) Configuration() *PluginDeclaration {
  106. return &r.Config
  107. }
  108. func HashedIdentity(identity string) string {
  109. hash := sha256.New()
  110. hash.Write([]byte(identity))
  111. return hex.EncodeToString(hash.Sum(nil))
  112. }
  113. func (r *PluginRuntime) HashedIdentity() (string, error) {
  114. return HashedIdentity(r.Config.Identity()), nil
  115. }
  116. func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
  117. return r.State
  118. }
  119. func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
  120. r.State.ScheduledAt = &t
  121. }
  122. func (r *PluginRuntime) InitState() {
  123. r.State = PluginRuntimeState{
  124. Restarts: 0,
  125. Status: PLUGIN_RUNTIME_STATUS_PENDING,
  126. ActiveAt: nil,
  127. StoppedAt: nil,
  128. Verified: false,
  129. ScheduledAt: nil,
  130. Logs: []string{},
  131. }
  132. }
  133. func (r *PluginRuntime) SetActive() {
  134. r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
  135. }
  136. func (r *PluginRuntime) SetLaunching() {
  137. r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
  138. }
  139. func (r *PluginRuntime) SetRestarting() {
  140. r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
  141. }
  142. func (r *PluginRuntime) SetPending() {
  143. r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
  144. }
  145. func (r *PluginRuntime) SetActiveAt(t time.Time) {
  146. r.State.ActiveAt = &t
  147. }
  148. func (r *PluginRuntime) SetScheduledAt(t time.Time) {
  149. r.State.ScheduledAt = &t
  150. }
  151. func (r *PluginRuntime) AddRestarts() {
  152. r.State.Restarts++
  153. }
  154. func (r *PluginRuntime) OnStop(f func()) {
  155. r.onStopped = append(r.onStopped, f)
  156. }
  157. func (r *PluginRuntime) TriggerStop() {
  158. for _, f := range r.onStopped {
  159. f()
  160. }
  161. }
  162. func (s *PluginRuntime) Log(log string) {
  163. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Info] %s: %s", time.Now().Format(time.RFC3339), log))
  164. }
  165. func (s *PluginRuntime) Warn(log string) {
  166. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Warn] %s: %s", time.Now().Format(time.RFC3339), log))
  167. }
  168. func (s *PluginRuntime) Error(log string) {
  169. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Error] %s: %s", time.Now().Format(time.RFC3339), log))
  170. }
  171. type PluginRuntimeType string
  172. const (
  173. PLUGIN_RUNTIME_TYPE_LOCAL PluginRuntimeType = "local"
  174. PLUGIN_RUNTIME_TYPE_REMOTE PluginRuntimeType = "remote"
  175. PLUGIN_RUNTIME_TYPE_SERVERLESS PluginRuntimeType = "serverless"
  176. )
  177. type PluginRuntimeState struct {
  178. Restarts int `json:"restarts"`
  179. Status string `json:"status"`
  180. WorkingPath string `json:"working_path"`
  181. ActiveAt *time.Time `json:"active_at"`
  182. StoppedAt *time.Time `json:"stopped_at"`
  183. Verified bool `json:"verified"`
  184. ScheduledAt *time.Time `json:"scheduled_at"`
  185. Logs []string `json:"logs"`
  186. }
  187. func (s *PluginRuntimeState) Hash() (uint64, error) {
  188. buf := bytes.Buffer{}
  189. enc := gob.NewEncoder(&buf)
  190. err := enc.Encode(s)
  191. if err != nil {
  192. return 0, err
  193. }
  194. j := fnv.New64a()
  195. _, err = j.Write(buf.Bytes())
  196. if err != nil {
  197. return 0, err
  198. }
  199. return j.Sum64(), nil
  200. }
  201. const (
  202. PLUGIN_RUNTIME_STATUS_ACTIVE = "active"
  203. PLUGIN_RUNTIME_STATUS_LAUNCHING = "launching"
  204. PLUGIN_RUNTIME_STATUS_STOPPED = "stopped"
  205. PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
  206. PLUGIN_RUNTIME_STATUS_PENDING = "pending"
  207. )