runtime.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package entities
  2. import (
  3. "bytes"
  4. "crypto/sha256"
  5. "encoding/gob"
  6. "encoding/hex"
  7. "fmt"
  8. "hash/fnv"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. )
  12. type (
  13. PluginRuntime struct {
  14. State PluginRuntimeState `json:"state"`
  15. Config plugin_entities.PluginDeclaration `json:"config"`
  16. onStopped []func() `json:"-"`
  17. }
  18. PluginRuntimeInterface interface {
  19. PluginRuntimeTimeLifeInterface
  20. PluginRuntimeSessionIOInterface
  21. PluginRuntimeDockerInterface
  22. PluginRuntimeLogInterface
  23. }
  24. PluginRuntimeTimeLifeInterface interface {
  25. // returns the plugin configuration
  26. Configuration() *plugin_entities.PluginDeclaration
  27. // unique identity of the plugin
  28. Identity() (string, error)
  29. // hashed identity of the plugin
  30. HashedIdentity() (string, error)
  31. // before the plugin starts, it will call this method to initialize the environment
  32. InitEnvironment() error
  33. // start the plugin, returns errors if the plugin fails to start and hangs until the plugin stops
  34. StartPlugin() error
  35. // returns true if the plugin is stopped
  36. Stopped() bool
  37. // stop the plugin
  38. Stop()
  39. // add a function to be called when the plugin stops
  40. OnStop(func())
  41. // trigger the stop event
  42. TriggerStop()
  43. // returns the runtime state of the plugin
  44. RuntimeState() PluginRuntimeState
  45. // Update the runtime state of the plugin
  46. UpdateScheduledAt(t time.Time)
  47. // returns the checksum of the plugin
  48. Checksum() string
  49. // wait for the plugin to stop
  50. Wait() (<-chan bool, error)
  51. // returns the runtime type of the plugin
  52. Type() PluginRuntimeType
  53. // Cleanup the plugin runtime
  54. Cleanup()
  55. // set the plugin to active
  56. SetActive()
  57. // set the plugin to launching
  58. SetLaunching()
  59. // set the plugin to restarting
  60. SetRestarting()
  61. // set the plugin to pending
  62. SetPending()
  63. // set the active time of the plugin
  64. SetActiveAt(t time.Time)
  65. // set the scheduled time of the plugin
  66. SetScheduledAt(t time.Time)
  67. // add restarts to the plugin
  68. AddRestarts()
  69. }
  70. PluginRuntimeLogInterface interface {
  71. // Log adds a log to the plugin runtime state
  72. Log(string)
  73. // Warn adds a warning to the plugin runtime state
  74. Warn(string)
  75. // Error adds an error to the plugin runtime state
  76. Error(string)
  77. }
  78. PluginRuntimeSessionIOInterface interface {
  79. Listen(session_id string) *BytesIOListener
  80. Write(session_id string, data []byte)
  81. }
  82. PluginRuntimeDockerInterface interface {
  83. // returns the docker image and the delete function, always call the delete function when the image is no longer needed
  84. }
  85. )
  86. func (r *PluginRuntime) Stopped() bool {
  87. return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
  88. }
  89. func (r *PluginRuntime) Stop() {
  90. r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
  91. }
  92. func (r *PluginRuntime) Configuration() *plugin_entities.PluginDeclaration {
  93. return &r.Config
  94. }
  95. func (r *PluginRuntime) Identity() (string, error) {
  96. return r.Config.Identity(), nil
  97. }
  98. func HashedIdentity(identity string) string {
  99. hash := sha256.New()
  100. hash.Write([]byte(identity))
  101. return hex.EncodeToString(hash.Sum(nil))
  102. }
  103. func (r *PluginRuntime) HashedIdentity() (string, error) {
  104. return HashedIdentity(r.Config.Identity()), nil
  105. }
  106. func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
  107. return r.State
  108. }
  109. func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
  110. r.State.ScheduledAt = &t
  111. }
  112. func (r *PluginRuntime) InitState() {
  113. r.State = PluginRuntimeState{
  114. Restarts: 0,
  115. Status: PLUGIN_RUNTIME_STATUS_PENDING,
  116. ActiveAt: nil,
  117. StoppedAt: nil,
  118. Verified: false,
  119. ScheduledAt: nil,
  120. Logs: []string{},
  121. }
  122. }
  123. func (r *PluginRuntime) SetActive() {
  124. r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
  125. }
  126. func (r *PluginRuntime) SetLaunching() {
  127. r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
  128. }
  129. func (r *PluginRuntime) SetRestarting() {
  130. r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
  131. }
  132. func (r *PluginRuntime) SetPending() {
  133. r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
  134. }
  135. func (r *PluginRuntime) SetActiveAt(t time.Time) {
  136. r.State.ActiveAt = &t
  137. }
  138. func (r *PluginRuntime) SetScheduledAt(t time.Time) {
  139. r.State.ScheduledAt = &t
  140. }
  141. func (r *PluginRuntime) AddRestarts() {
  142. r.State.Restarts++
  143. }
  144. func (r *PluginRuntime) OnStop(f func()) {
  145. r.onStopped = append(r.onStopped, f)
  146. }
  147. func (r *PluginRuntime) TriggerStop() {
  148. for _, f := range r.onStopped {
  149. f()
  150. }
  151. }
  152. func (s *PluginRuntime) Log(log string) {
  153. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Info] %s: %s", time.Now().Format(time.RFC3339), log))
  154. }
  155. func (s *PluginRuntime) Warn(log string) {
  156. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Warn] %s: %s", time.Now().Format(time.RFC3339), log))
  157. }
  158. func (s *PluginRuntime) Error(log string) {
  159. s.State.Logs = append(s.State.Logs, fmt.Sprintf("[Error] %s: %s", time.Now().Format(time.RFC3339), log))
  160. }
  161. type PluginRuntimeType string
  162. const (
  163. PLUGIN_RUNTIME_TYPE_LOCAL PluginRuntimeType = "local"
  164. PLUGIN_RUNTIME_TYPE_REMOTE PluginRuntimeType = "remote"
  165. PLUGIN_RUNTIME_TYPE_AWS PluginRuntimeType = "aws"
  166. )
  167. type PluginRuntimeState struct {
  168. Restarts int `json:"restarts"`
  169. Status string `json:"status"`
  170. AbsolutePath string `json:"absolute_path"`
  171. WorkingPath string `json:"working_path"`
  172. ActiveAt *time.Time `json:"active_at"`
  173. StoppedAt *time.Time `json:"stopped_at"`
  174. Verified bool `json:"verified"`
  175. ScheduledAt *time.Time `json:"scheduled_at"`
  176. Logs []string `json:"logs"`
  177. }
  178. func (s *PluginRuntimeState) Hash() (uint64, error) {
  179. buf := bytes.Buffer{}
  180. enc := gob.NewEncoder(&buf)
  181. err := enc.Encode(s)
  182. if err != nil {
  183. return 0, err
  184. }
  185. j := fnv.New64a()
  186. _, err = j.Write(buf.Bytes())
  187. if err != nil {
  188. return 0, err
  189. }
  190. return j.Sum64(), nil
  191. }
  192. const (
  193. PLUGIN_RUNTIME_STATUS_ACTIVE = "active"
  194. PLUGIN_RUNTIME_STATUS_LAUNCHING = "launching"
  195. PLUGIN_RUNTIME_STATUS_STOPPED = "stopped"
  196. PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
  197. PLUGIN_RUNTIME_STATUS_PENDING = "pending"
  198. )