runtime.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package entities
  2. import (
  3. "bytes"
  4. "crypto/sha256"
  5. "encoding/binary"
  6. "encoding/gob"
  7. "encoding/hex"
  8. "hash/fnv"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  12. )
  13. type (
  14. PluginRuntime struct {
  15. State PluginRuntimeState `json:"state"`
  16. Config plugin_entities.PluginDeclaration `json:"config"`
  17. onStopped []func() `json:"-"`
  18. }
  19. PluginRuntimeInterface interface {
  20. PluginRuntimeTimeLifeInterface
  21. PluginRuntimeSessionIOInterface
  22. }
  23. PluginRuntimeTimeLifeInterface interface {
  24. // returns the plugin configuration
  25. Configuration() *plugin_entities.PluginDeclaration
  26. // unique identity of the plugin
  27. Identity() (string, error)
  28. // hashed identity of the plugin
  29. HashedIdentity() (string, error)
  30. // before the plugin starts, it will call this method to initialize the environment
  31. InitEnvironment() error
  32. // start the plugin, returns errors if the plugin fails to start and hangs until the plugin stops
  33. StartPlugin() error
  34. // returns true if the plugin is stopped
  35. Stopped() bool
  36. // stop the plugin
  37. Stop()
  38. // add a function to be called when the plugin stops
  39. OnStop(func())
  40. // trigger the stop event
  41. TriggerStop()
  42. // returns the runtime state of the plugin
  43. RuntimeState() PluginRuntimeState
  44. // Update the runtime state of the plugin
  45. UpdateScheduledAt(t time.Time)
  46. // returns the checksum of the plugin
  47. Checksum() string
  48. // wait for the plugin to stop
  49. Wait() (<-chan bool, error)
  50. // returns the runtime type of the plugin
  51. Type() PluginRuntimeType
  52. // set the plugin to active
  53. SetActive()
  54. // set the plugin to launching
  55. SetLaunching()
  56. // set the plugin to restarting
  57. SetRestarting()
  58. // set the plugin to pending
  59. SetPending()
  60. // set the active time of the plugin
  61. SetActiveAt(t time.Time)
  62. // set the scheduled time of the plugin
  63. SetScheduledAt(t time.Time)
  64. // add restarts to the plugin
  65. AddRestarts()
  66. }
  67. PluginRuntimeSessionIOInterface interface {
  68. Listen(session_id string) *BytesIOListener
  69. Write(session_id string, data []byte)
  70. }
  71. )
  72. func (r *PluginRuntime) Stopped() bool {
  73. return r.State.Status == PLUGIN_RUNTIME_STATUS_STOPPED
  74. }
  75. func (r *PluginRuntime) Stop() {
  76. r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
  77. }
  78. func (r *PluginRuntime) Configuration() *plugin_entities.PluginDeclaration {
  79. return &r.Config
  80. }
  81. func (r *PluginRuntime) Identity() (string, error) {
  82. return r.Config.Identity(), nil
  83. }
  84. func HashedIdentity(identity string) string {
  85. hash := sha256.New()
  86. hash.Write([]byte(identity))
  87. return hex.EncodeToString(hash.Sum(nil))
  88. }
  89. func (r *PluginRuntime) HashedIdentity() (string, error) {
  90. return HashedIdentity(r.Config.Identity()), nil
  91. }
  92. func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
  93. return r.State
  94. }
  95. func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
  96. r.State.ScheduledAt = &t
  97. }
  98. func (r *PluginRuntime) InitState() {
  99. r.State = PluginRuntimeState{
  100. Restarts: 0,
  101. Status: PLUGIN_RUNTIME_STATUS_PENDING,
  102. ActiveAt: nil,
  103. StoppedAt: nil,
  104. Verified: false,
  105. ScheduledAt: nil,
  106. Logs: []string{},
  107. }
  108. }
  109. func (r *PluginRuntime) SetActive() {
  110. r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
  111. }
  112. func (r *PluginRuntime) SetLaunching() {
  113. r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
  114. }
  115. func (r *PluginRuntime) SetRestarting() {
  116. r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
  117. }
  118. func (r *PluginRuntime) SetPending() {
  119. r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
  120. }
  121. func (r *PluginRuntime) SetActiveAt(t time.Time) {
  122. r.State.ActiveAt = &t
  123. }
  124. func (r *PluginRuntime) SetScheduledAt(t time.Time) {
  125. r.State.ScheduledAt = &t
  126. }
  127. func (r *PluginRuntime) AddRestarts() {
  128. r.State.Restarts++
  129. }
  130. func (r *PluginRuntime) Checksum() string {
  131. buf := bytes.Buffer{}
  132. binary.Write(&buf, binary.BigEndian, parser.MarshalJsonBytes(r.Config))
  133. hash := sha256.New()
  134. hash.Write(buf.Bytes())
  135. return hex.EncodeToString(hash.Sum(nil))
  136. }
  137. func (r *PluginRuntime) OnStop(f func()) {
  138. r.onStopped = append(r.onStopped, f)
  139. }
  140. func (r *PluginRuntime) TriggerStop() {
  141. for _, f := range r.onStopped {
  142. f()
  143. }
  144. }
  145. type PluginRuntimeType string
  146. const (
  147. PLUGIN_RUNTIME_TYPE_LOCAL PluginRuntimeType = "local"
  148. PLUGIN_RUNTIME_TYPE_REMOTE PluginRuntimeType = "remote"
  149. PLUGIN_RUNTIME_TYPE_AWS PluginRuntimeType = "aws"
  150. )
  151. type PluginRuntimeState struct {
  152. Restarts int `json:"restarts"`
  153. Status string `json:"status"`
  154. RelativePath string `json:"relative_path"`
  155. ActiveAt *time.Time `json:"active_at"`
  156. StoppedAt *time.Time `json:"stopped_at"`
  157. Verified bool `json:"verified"`
  158. ScheduledAt *time.Time `json:"scheduled_at"`
  159. Logs []string `json:"logs"`
  160. }
  161. func (s *PluginRuntimeState) Hash() (uint64, error) {
  162. buf := bytes.Buffer{}
  163. enc := gob.NewEncoder(&buf)
  164. err := enc.Encode(s)
  165. if err != nil {
  166. return 0, err
  167. }
  168. j := fnv.New64a()
  169. _, err = j.Write(buf.Bytes())
  170. if err != nil {
  171. return 0, err
  172. }
  173. return j.Sum64(), nil
  174. }
  175. const (
  176. PLUGIN_RUNTIME_STATUS_ACTIVE = "active"
  177. PLUGIN_RUNTIME_STATUS_LAUNCHING = "launching"
  178. PLUGIN_RUNTIME_STATUS_STOPPED = "stopped"
  179. PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
  180. PLUGIN_RUNTIME_STATUS_PENDING = "pending"
  181. )