run.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package debugging_runtime
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  7. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  8. )
  9. func (r *RemotePluginRuntime) InitEnvironment() error {
  10. return nil
  11. }
  12. func (r *RemotePluginRuntime) Stopped() bool {
  13. return !r.alive
  14. }
  15. func (r *RemotePluginRuntime) Stop() {
  16. r.alive = false
  17. if r.conn == nil {
  18. return
  19. }
  20. r.conn.Close()
  21. }
  22. func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType {
  23. return plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE
  24. }
  25. func (r *RemotePluginRuntime) StartPlugin() error {
  26. var exitError error
  27. identity, err := r.Identity()
  28. if err != nil {
  29. return err
  30. }
  31. // handle heartbeat
  32. routine.Submit(map[string]string{
  33. "module": "debugging_runtime",
  34. "function": "StartPlugin",
  35. "plugin_id": identity.String(),
  36. }, func() {
  37. r.lastActiveAt = time.Now()
  38. ticker := time.NewTicker(5 * time.Second)
  39. defer ticker.Stop()
  40. for {
  41. select {
  42. case <-ticker.C:
  43. if time.Since(r.lastActiveAt) > 60*time.Second {
  44. // kill this connection if it's not active for a long time
  45. r.conn.Close()
  46. exitError = plugin_errors.ErrPluginNotActive
  47. return
  48. }
  49. case <-r.shutdownChan:
  50. return
  51. }
  52. }
  53. })
  54. r.response.Async(func(data []byte) {
  55. plugin_entities.ParsePluginUniversalEvent(
  56. data,
  57. "",
  58. func(session_id string, data []byte) {
  59. r.messageCallbacksLock.RLock()
  60. listeners := r.messageCallbacks[session_id][:]
  61. r.messageCallbacksLock.RUnlock()
  62. // handle session event
  63. for _, listener := range listeners {
  64. listener(data)
  65. }
  66. },
  67. func() {
  68. r.lastActiveAt = time.Now()
  69. },
  70. func(err string) {
  71. log.Error("plugin %s: %s", r.Configuration().Identity(), err)
  72. },
  73. func(message string) {
  74. log.Info("plugin %s: %s", r.Configuration().Identity(), message)
  75. },
  76. )
  77. })
  78. return exitError
  79. }
  80. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  81. return r.shutdownChan, nil
  82. }
  83. func (r *RemotePluginRuntime) Checksum() (string, error) {
  84. return r.checksum, nil
  85. }