run.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package remote_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. )
  11. func (r *RemotePluginRuntime) InitEnvironment() error {
  12. return nil
  13. }
  14. func (r *RemotePluginRuntime) Stopped() bool {
  15. return !r.alive
  16. }
  17. func (r *RemotePluginRuntime) Stop() {
  18. r.alive = false
  19. if r.conn == nil {
  20. return
  21. }
  22. r.conn.Close()
  23. }
  24. func (r *RemotePluginRuntime) Type() entities.PluginRuntimeType {
  25. return entities.PLUGIN_RUNTIME_TYPE_REMOTE
  26. }
  27. func (r *RemotePluginRuntime) StartPlugin() error {
  28. var exit_error error
  29. // handle heartbeat
  30. routine.Submit(func() {
  31. r.last_active_at = time.Now()
  32. ticker := time.NewTicker(5 * time.Second)
  33. defer ticker.Stop()
  34. for {
  35. select {
  36. case <-ticker.C:
  37. if time.Since(r.last_active_at) > 20*time.Second {
  38. // kill this connection
  39. r.conn.Close()
  40. exit_error = plugin_errors.ErrPluginNotActive
  41. return
  42. }
  43. case <-r.shutdown_chan:
  44. return
  45. }
  46. }
  47. })
  48. for r.response.Next() {
  49. data, err := r.response.Read()
  50. if err != nil {
  51. return err
  52. }
  53. // handle event
  54. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  55. if err != nil {
  56. continue
  57. }
  58. session_id := event.SessionId
  59. switch event.Event {
  60. case plugin_entities.PLUGIN_EVENT_LOG:
  61. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  62. log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
  63. event.Data,
  64. )
  65. if err != nil {
  66. log.Error("unmarshal json failed: %s", err.Error())
  67. continue
  68. }
  69. log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
  70. }
  71. case plugin_entities.PLUGIN_EVENT_SESSION:
  72. r.callbacks_lock.RLock()
  73. listeners := r.callbacks[session_id][:]
  74. r.callbacks_lock.RUnlock()
  75. // handle session event
  76. for _, listener := range listeners {
  77. listener(event.Data)
  78. }
  79. case plugin_entities.PLUGIN_EVENT_ERROR:
  80. log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
  81. case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
  82. r.last_active_at = time.Now()
  83. }
  84. }
  85. return exit_error
  86. }
  87. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  88. return r.shutdown_chan, nil
  89. }