session.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package session_manager
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. )
  11. var (
  12. sessions map[string]*Session = map[string]*Session{}
  13. session_lock sync.RWMutex
  14. )
  15. // session need to implement the backwards_invocation.BackwardsInvocationWriter interface
  16. type Session struct {
  17. id string
  18. runtime plugin_entities.PluginRuntimeInterface
  19. tenant_id string
  20. user_id string
  21. plugin_identity string
  22. cluster_id string
  23. }
  24. type SessionInfo struct {
  25. TenantID string `json:"tenant_id"`
  26. UserID string `json:"user_id"`
  27. PluginIdentity string `json:"plugin_identity"`
  28. ClusterID string `json:"cluster_id"`
  29. }
  30. const (
  31. SESSION_INFO_MAP_KEY = "session_info"
  32. )
  33. func NewSession(tenant_id string, user_id string, plugin_identity string, cluster_id string) *Session {
  34. s := &Session{
  35. id: uuid.New().String(),
  36. tenant_id: tenant_id,
  37. user_id: user_id,
  38. plugin_identity: plugin_identity,
  39. cluster_id: cluster_id,
  40. }
  41. session_lock.Lock()
  42. sessions[s.id] = s
  43. session_lock.Unlock()
  44. session_info := &SessionInfo{
  45. TenantID: tenant_id,
  46. UserID: user_id,
  47. PluginIdentity: plugin_identity,
  48. ClusterID: cluster_id,
  49. }
  50. if err := cache.SetMapOneField(SESSION_INFO_MAP_KEY, s.id, session_info); err != nil {
  51. log.Error("set session info to cache failed, %s", err)
  52. }
  53. return s
  54. }
  55. func GetSession(id string) *Session {
  56. session_lock.RLock()
  57. defer session_lock.RUnlock()
  58. return sessions[id]
  59. }
  60. func DeleteSession(id string) {
  61. session_lock.Lock()
  62. delete(sessions, id)
  63. session_lock.Unlock()
  64. if err := cache.DelMapField(SESSION_INFO_MAP_KEY, id); err != nil {
  65. log.Error("delete session info from cache failed, %s", err)
  66. }
  67. }
  68. func (s *Session) Close() {
  69. session_lock.Lock()
  70. delete(sessions, s.id)
  71. session_lock.Unlock()
  72. if err := cache.DelMapField(SESSION_INFO_MAP_KEY, s.id); err != nil {
  73. log.Error("delete session info from cache failed, %s", err)
  74. }
  75. }
  76. func (s *Session) ID() string {
  77. return s.id
  78. }
  79. func (s *Session) TenantID() string {
  80. return s.tenant_id
  81. }
  82. func (s *Session) UserID() string {
  83. return s.user_id
  84. }
  85. func (s *Session) PluginIdentity() string {
  86. return s.plugin_identity
  87. }
  88. func (s *Session) BindRuntime(runtime plugin_entities.PluginRuntimeInterface) {
  89. s.runtime = runtime
  90. }
  91. func (s *Session) Runtime() plugin_entities.PluginRuntimeInterface {
  92. return s.runtime
  93. }
  94. type PLUGIN_IN_STREAM_EVENT string
  95. const (
  96. PLUGIN_IN_STREAM_EVENT_REQUEST PLUGIN_IN_STREAM_EVENT = "request"
  97. PLUGIN_IN_STREAM_EVENT_RESPONSE PLUGIN_IN_STREAM_EVENT = "backwards_response"
  98. )
  99. func (s *Session) Message(event PLUGIN_IN_STREAM_EVENT, data any) []byte {
  100. return parser.MarshalJsonBytes(map[string]any{
  101. "session_id": s.id,
  102. "event": event,
  103. "data": data,
  104. })
  105. }
  106. func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, data any) error {
  107. if s.runtime == nil {
  108. return errors.New("runtime not bound")
  109. }
  110. s.runtime.Write(s.id, s.Message(event, data))
  111. return nil
  112. }