session.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package session_manager
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. )
  9. var (
  10. sessions map[string]*Session = map[string]*Session{}
  11. session_lock sync.RWMutex
  12. )
  13. // session need to implement the backwards_invocation.BackwardsInvocationWriter interface
  14. type Session struct {
  15. id string
  16. runtime plugin_entities.PluginRuntimeSessionIOInterface
  17. tenant_id string
  18. user_id string
  19. plugin_identity string
  20. }
  21. func NewSession(tenant_id string, user_id string, plugin_identity string) *Session {
  22. s := &Session{
  23. id: uuid.New().String(),
  24. tenant_id: tenant_id,
  25. user_id: user_id,
  26. plugin_identity: plugin_identity,
  27. }
  28. session_lock.Lock()
  29. defer session_lock.Unlock()
  30. sessions[s.id] = s
  31. return s
  32. }
  33. func GetSession(id string) *Session {
  34. session_lock.RLock()
  35. defer session_lock.RUnlock()
  36. return sessions[id]
  37. }
  38. func DeleteSession(id string) {
  39. session_lock.Lock()
  40. defer session_lock.Unlock()
  41. delete(sessions, id)
  42. }
  43. func (s *Session) Close() {
  44. session_lock.Lock()
  45. defer session_lock.Unlock()
  46. delete(sessions, s.id)
  47. }
  48. func (s *Session) ID() string {
  49. return s.id
  50. }
  51. func (s *Session) TenantID() string {
  52. return s.tenant_id
  53. }
  54. func (s *Session) UserID() string {
  55. return s.user_id
  56. }
  57. func (s *Session) PluginIdentity() string {
  58. return s.plugin_identity
  59. }
  60. func (s *Session) BindRuntime(runtime plugin_entities.PluginRuntimeSessionIOInterface) {
  61. s.runtime = runtime
  62. }
  63. type PLUGIN_IN_STREAM_EVENT string
  64. const (
  65. PLUGIN_IN_STREAM_EVENT_REQUEST PLUGIN_IN_STREAM_EVENT = "request"
  66. PLUGIN_IN_STREAM_EVENT_RESPONSE PLUGIN_IN_STREAM_EVENT = "backwards_response"
  67. )
  68. func (s *Session) Message(event PLUGIN_IN_STREAM_EVENT, data any) []byte {
  69. return parser.MarshalJsonBytes(map[string]any{
  70. "session_id": s.id,
  71. "event": event,
  72. "data": data,
  73. })
  74. }
  75. func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, data any) error {
  76. if s.runtime == nil {
  77. return errors.New("runtime not bound")
  78. }
  79. s.runtime.Write(s.id, s.Message(event, data))
  80. return nil
  81. }