run.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package remote_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. )
  11. func (r *RemotePluginRuntime) InitEnvironment() error {
  12. return nil
  13. }
  14. func (r *RemotePluginRuntime) Stopped() bool {
  15. return !r.alive
  16. }
  17. func (r *RemotePluginRuntime) Stop() {
  18. r.alive = false
  19. if r.conn == nil {
  20. return
  21. }
  22. r.conn.Close()
  23. }
  24. func (r *RemotePluginRuntime) Type() entities.PluginRuntimeType {
  25. return entities.PLUGIN_RUNTIME_TYPE_REMOTE
  26. }
  27. func (r *RemotePluginRuntime) StartPlugin() error {
  28. var exit_error error
  29. // handle heartbeat
  30. routine.Submit(func() {
  31. r.last_active_at = time.Now()
  32. ticker := time.NewTicker(5 * time.Second)
  33. defer ticker.Stop()
  34. for {
  35. select {
  36. case <-ticker.C:
  37. if time.Since(r.last_active_at) > 20*time.Second {
  38. // kill this connection
  39. r.conn.Close()
  40. exit_error = plugin_errors.ErrPluginNotActive
  41. return
  42. }
  43. case <-r.shutdown_chan:
  44. return
  45. }
  46. }
  47. })
  48. r.response.Wrap(func(data []byte) {
  49. // handle event
  50. event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
  51. if err != nil {
  52. return
  53. }
  54. session_id := event.SessionId
  55. switch event.Event {
  56. case plugin_entities.PLUGIN_EVENT_LOG:
  57. if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
  58. log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
  59. event.Data,
  60. )
  61. if err != nil {
  62. log.Error("unmarshal json failed: %s", err.Error())
  63. return
  64. }
  65. log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
  66. }
  67. case plugin_entities.PLUGIN_EVENT_SESSION:
  68. r.callbacks_lock.RLock()
  69. listeners := r.callbacks[session_id][:]
  70. r.callbacks_lock.RUnlock()
  71. // handle session event
  72. for _, listener := range listeners {
  73. listener(event.Data)
  74. }
  75. case plugin_entities.PLUGIN_EVENT_ERROR:
  76. log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
  77. case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
  78. r.last_active_at = time.Now()
  79. }
  80. })
  81. return exit_error
  82. }
  83. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  84. return r.shutdown_chan, nil
  85. }
  86. func (r *RemotePluginRuntime) Checksum() string {
  87. return r.checksum
  88. }