run.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package remote_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. )
  10. func (r *RemotePluginRuntime) InitEnvironment() error {
  11. return nil
  12. }
  13. func (r *RemotePluginRuntime) Stopped() bool {
  14. return !r.alive
  15. }
  16. func (r *RemotePluginRuntime) Stop() {
  17. r.alive = false
  18. if r.conn == nil {
  19. return
  20. }
  21. r.conn.Close()
  22. }
  23. func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType {
  24. return plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE
  25. }
  26. func (r *RemotePluginRuntime) StartPlugin() error {
  27. var exit_error error
  28. // handle heartbeat
  29. routine.Submit(func() {
  30. r.last_active_at = time.Now()
  31. ticker := time.NewTicker(5 * time.Second)
  32. defer ticker.Stop()
  33. for {
  34. select {
  35. case <-ticker.C:
  36. if time.Since(r.last_active_at) > 20*time.Second {
  37. // kill this connection
  38. r.conn.Close()
  39. exit_error = plugin_errors.ErrPluginNotActive
  40. return
  41. }
  42. case <-r.shutdown_chan:
  43. return
  44. }
  45. }
  46. })
  47. r.response.Async(func(data []byte) {
  48. // handle event
  49. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  50. if err != nil {
  51. return
  52. }
  53. session_id := event.SessionId
  54. switch event.Event {
  55. case plugin_entities.PLUGIN_EVENT_LOG:
  56. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  57. log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
  58. event.Data,
  59. )
  60. if err != nil {
  61. log.Error("unmarshal json failed: %s", err.Error())
  62. return
  63. }
  64. log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
  65. }
  66. case plugin_entities.PLUGIN_EVENT_SESSION:
  67. r.callbacks_lock.RLock()
  68. listeners := r.callbacks[session_id][:]
  69. r.callbacks_lock.RUnlock()
  70. // handle session event
  71. for _, listener := range listeners {
  72. listener(event.Data)
  73. }
  74. case plugin_entities.PLUGIN_EVENT_ERROR:
  75. log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
  76. case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
  77. r.last_active_at = time.Now()
  78. }
  79. })
  80. return exit_error
  81. }
  82. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  83. return r.shutdown_chan, nil
  84. }
  85. func (r *RemotePluginRuntime) Checksum() (string, error) {
  86. return r.checksum, nil
  87. }