run.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package remote_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  8. )
  9. func (r *RemotePluginRuntime) InitEnvironment() error {
  10. return nil
  11. }
  12. func (r *RemotePluginRuntime) Stopped() bool {
  13. return !r.alive
  14. }
  15. func (r *RemotePluginRuntime) Stop() {
  16. r.alive = false
  17. if r.conn == nil {
  18. return
  19. }
  20. r.conn.Close()
  21. }
  22. func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType {
  23. return plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE
  24. }
  25. func (r *RemotePluginRuntime) StartPlugin() error {
  26. var exitError error
  27. // handle heartbeat
  28. routine.Submit(func() {
  29. r.lastActiveAt = time.Now()
  30. ticker := time.NewTicker(5 * time.Second)
  31. defer ticker.Stop()
  32. for {
  33. select {
  34. case <-ticker.C:
  35. if time.Since(r.lastActiveAt) > 60*time.Second {
  36. // kill this connection if it's not active for a long time
  37. r.conn.Close()
  38. exitError = plugin_errors.ErrPluginNotActive
  39. return
  40. }
  41. case <-r.shutdownChan:
  42. return
  43. }
  44. }
  45. })
  46. r.response.Async(func(data []byte) {
  47. plugin_entities.ParsePluginUniversalEvent(
  48. data,
  49. func(session_id string, data []byte) {
  50. r.callbacksLock.RLock()
  51. listeners := r.callbacks[session_id][:]
  52. r.callbacksLock.RUnlock()
  53. // handle session event
  54. for _, listener := range listeners {
  55. listener(data)
  56. }
  57. },
  58. func() {
  59. r.lastActiveAt = time.Now()
  60. },
  61. func(err string) {
  62. log.Error("plugin %s: %s", r.Configuration().Identity(), err)
  63. },
  64. func(message string) {
  65. log.Info("plugin %s: %s", r.Configuration().Identity(), message)
  66. },
  67. )
  68. })
  69. return exitError
  70. }
  71. func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
  72. return r.shutdownChan, nil
  73. }
  74. func (r *RemotePluginRuntime) Checksum() (string, error) {
  75. return r.checksum, nil
  76. }