run.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package remote_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. )
  10. func (r *RemotePluginRuntime) InitEnvironment() error {
  11. return nil
  12. }
  13. func (r *RemotePluginRuntime) Stopped() bool {
  14. return !r.alive
  15. }
  16. func (r *RemotePluginRuntime) Stop() {
  17. r.alive = false
  18. if r.conn == nil {
  19. return
  20. }
  21. r.conn.Close()
  22. }
  23. func (r *RemotePluginRuntime) StartPlugin() error {
  24. var exit_error error
  25. // handle heartbeat
  26. routine.Submit(func() {
  27. r.last_active_at = time.Now()
  28. ticker := time.NewTicker(5 * time.Second)
  29. defer ticker.Stop()
  30. for {
  31. select {
  32. case <-ticker.C:
  33. if time.Since(r.last_active_at) > 20*time.Second {
  34. // kill this connection
  35. r.conn.Close()
  36. exit_error = plugin_errors.ErrPluginNotActive
  37. return
  38. }
  39. case <-r.shutdown_chan:
  40. return
  41. }
  42. }
  43. })
  44. for r.response.Next() {
  45. data, err := r.response.Read()
  46. if err != nil {
  47. return err
  48. }
  49. // handle event
  50. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  51. if err != nil {
  52. continue
  53. }
  54. session_id := event.SessionId
  55. switch event.Event {
  56. case plugin_entities.PLUGIN_EVENT_LOG:
  57. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  58. log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
  59. event.Data,
  60. )
  61. if err != nil {
  62. log.Error("unmarshal json failed: %s", err.Error())
  63. continue
  64. }
  65. log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
  66. }
  67. case plugin_entities.PLUGIN_EVENT_SESSION:
  68. r.callbacks_lock.RLock()
  69. listeners := r.callbacks[session_id][:]
  70. r.callbacks_lock.RUnlock()
  71. // handle session event
  72. for _, listener := range listeners {
  73. listener(event.Data)
  74. }
  75. case plugin_entities.PLUGIN_EVENT_ERROR:
  76. log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
  77. case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
  78. r.last_active_at = time.Now()
  79. }
  80. }
  81. return exit_error
  82. }
  83. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  84. return r.shutdown_chan, nil
  85. }