combined.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package helper
  2. import (
  3. "errors"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  11. )
  12. var (
  13. ErrPluginNotFound = errors.New("plugin not found")
  14. )
  15. type memCacheItem struct {
  16. declaration *plugin_entities.PluginDeclaration
  17. accessCount int64
  18. lastAccess time.Time
  19. }
  20. type memCache struct {
  21. sync.RWMutex
  22. items map[string]*memCacheItem
  23. itemSize int64
  24. }
  25. var (
  26. // 500MB memory cache
  27. maxMemCacheSize = int64(1024)
  28. // 600s TTL
  29. maxTTL = 600 * time.Second
  30. pluginCache = &memCache{
  31. items: make(map[string]*memCacheItem),
  32. itemSize: 0,
  33. }
  34. )
  35. func (c *memCache) get(key string) *plugin_entities.PluginDeclaration {
  36. c.RLock()
  37. item, exists := c.items[key]
  38. c.RUnlock()
  39. if !exists {
  40. return nil
  41. }
  42. // Check TTL with a read lock first
  43. if time.Since(item.lastAccess) > maxTTL {
  44. c.Lock()
  45. // Double check after acquiring write lock
  46. if item, exists = c.items[key]; exists {
  47. if time.Since(item.lastAccess) > maxTTL {
  48. c.itemSize--
  49. delete(c.items, key)
  50. }
  51. }
  52. c.Unlock()
  53. return nil
  54. }
  55. // Update access count and time atomically
  56. c.Lock()
  57. if item, exists = c.items[key]; exists {
  58. item.accessCount++
  59. item.lastAccess = time.Now()
  60. }
  61. c.Unlock()
  62. if exists {
  63. return item.declaration
  64. }
  65. return nil
  66. }
  67. func (c *memCache) set(key string, declaration *plugin_entities.PluginDeclaration) {
  68. c.Lock()
  69. defer c.Unlock()
  70. // Clean expired items first
  71. now := time.Now()
  72. for k, v := range c.items {
  73. if now.Sub(v.lastAccess) > maxTTL {
  74. c.itemSize--
  75. delete(c.items, k)
  76. }
  77. }
  78. // Remove least accessed items if cache is full
  79. for c.itemSize >= maxMemCacheSize {
  80. var leastKey string
  81. var leastCount int64 = -1
  82. var oldestAccess = time.Now()
  83. for k, v := range c.items {
  84. // Prioritize by access count, then by age
  85. if leastCount == -1 || v.accessCount < leastCount ||
  86. (v.accessCount == leastCount && v.lastAccess.Before(oldestAccess)) {
  87. leastCount = v.accessCount
  88. oldestAccess = v.lastAccess
  89. leastKey = k
  90. }
  91. }
  92. if leastKey != "" {
  93. c.itemSize--
  94. delete(c.items, leastKey)
  95. }
  96. }
  97. // Add new item
  98. c.items[key] = &memCacheItem{
  99. declaration: declaration,
  100. accessCount: 1,
  101. lastAccess: now,
  102. }
  103. c.itemSize++
  104. }
  105. func CombinedGetPluginDeclaration(
  106. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  107. runtimeType plugin_entities.PluginRuntimeType,
  108. ) (*plugin_entities.PluginDeclaration, error) {
  109. cacheKey := strings.Join(
  110. []string{
  111. "declaration_cache",
  112. string(runtimeType),
  113. pluginUniqueIdentifier.String(),
  114. },
  115. ":",
  116. )
  117. // Try memory cache first
  118. if declaration := pluginCache.get(cacheKey); declaration != nil {
  119. return declaration, nil
  120. }
  121. // Try Redis cache next
  122. declaration, err := cache.AutoGetWithGetter(
  123. cacheKey,
  124. func() (*plugin_entities.PluginDeclaration, error) {
  125. if runtimeType != plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE {
  126. declaration, err := db.GetOne[models.PluginDeclaration](
  127. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  128. )
  129. if err == db.ErrDatabaseNotFound {
  130. return nil, ErrPluginNotFound
  131. }
  132. if err != nil {
  133. return nil, err
  134. }
  135. return &declaration.Declaration, nil
  136. } else {
  137. // try to fetch the declaration from plugin if it's remote
  138. plugin, err := db.GetOne[models.Plugin](
  139. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  140. db.Equal("install_type", string(plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE)),
  141. )
  142. if err == db.ErrDatabaseNotFound {
  143. return nil, ErrPluginNotFound
  144. }
  145. if err != nil {
  146. return nil, err
  147. }
  148. return &plugin.RemoteDeclaration, nil
  149. }
  150. },
  151. )
  152. if err == nil {
  153. // Store successful result in memory cache
  154. pluginCache.set(cacheKey, declaration)
  155. }
  156. return declaration, err
  157. }