session.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package session_manager
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. )
  9. var (
  10. sessions map[string]*Session = map[string]*Session{}
  11. session_lock sync.RWMutex
  12. )
  13. type Session struct {
  14. id string
  15. runtime entities.PluginRuntimeSessionIOInterface
  16. tenant_id string
  17. user_id string
  18. plugin_identity string
  19. }
  20. func NewSession(tenant_id string, user_id string, plugin_identity string) *Session {
  21. s := &Session{
  22. id: uuid.New().String(),
  23. tenant_id: tenant_id,
  24. user_id: user_id,
  25. plugin_identity: plugin_identity,
  26. }
  27. session_lock.Lock()
  28. defer session_lock.Unlock()
  29. sessions[s.id] = s
  30. return s
  31. }
  32. func GetSession(id string) *Session {
  33. session_lock.RLock()
  34. defer session_lock.RUnlock()
  35. return sessions[id]
  36. }
  37. func DeleteSession(id string) {
  38. session_lock.Lock()
  39. defer session_lock.Unlock()
  40. delete(sessions, id)
  41. }
  42. func (s *Session) Close() {
  43. session_lock.Lock()
  44. defer session_lock.Unlock()
  45. delete(sessions, s.id)
  46. }
  47. func (s *Session) ID() string {
  48. return s.id
  49. }
  50. func (s *Session) TenantID() string {
  51. return s.tenant_id
  52. }
  53. func (s *Session) UserID() string {
  54. return s.user_id
  55. }
  56. func (s *Session) PluginIdentity() string {
  57. return s.plugin_identity
  58. }
  59. func (s *Session) BindRuntime(runtime entities.PluginRuntimeSessionIOInterface) {
  60. s.runtime = runtime
  61. }
  62. type PLUGIN_IN_STREAM_EVENT string
  63. const (
  64. PLUGIN_IN_STREAM_EVENT_REQUEST PLUGIN_IN_STREAM_EVENT = "request"
  65. PLUGIN_IN_STREAM_EVENT_RESPONSE PLUGIN_IN_STREAM_EVENT = "backwards_response"
  66. )
  67. func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, data any) error {
  68. if s.runtime == nil {
  69. return errors.New("runtime not bound")
  70. }
  71. s.runtime.Write(s.id, parser.MarshalJsonBytes(map[string]any{
  72. "session_id": s.id,
  73. "event": event,
  74. "data": data,
  75. }))
  76. return nil
  77. }