Browse Source

[fix] cache using sync.Map

dev v0.1.16
eldar.sahipov 3 months ago
parent
commit
4055df04cf
  1. 20
      README.md
  2. 94
      cache.go
  3. 158
      cache_test.go

20
README.md

@ -31,4 +31,22 @@ value, err = cache.Cache().Get(key)
if err != nil {
err = fmt.Errorf("get value is fail. err: %s", err)
}
```
```
## v0.1.16 dev
Переписан пакет с использованием sync.Map для избежания паник
Результаты сравнения тестов Map и sync.Map в контексте данного пакета
## Результаты тестирования производительности
В таблице ниже представлены результаты тестирования производительности с использованием `Map` и `sync.Map`:
| Тест | Время (ns/op) с Map | Время (ns/op) с sync.Map | Время, быстрее в раз | B/op с Map | B/op с sync.Map | Сокращение B/op | allocs/op с Map | allocs/op с sync.Map | Сокращение allocs/op |
|------------------------------------------|---------------------|--------------------------|----------------------|------------|-----------------|------------------|-----------------|----------------------|-----------------------|
| BenchmarkCache_ConcurrentUpsertAndGet | 8,298,061,621 | 509,060,644 | 16.3 | 400,676,440| 212,454,344 | 47% | 4,802,734 | 2,904,396 | 39.5% |
| BenchmarkCache_ConcurrentUpsert | 80,735 | 10,448 | 7.7 | 3,796 | 4,172 | -9.9% | 43 | 54 | -25.6% |
| BenchmarkCache_ConcurrentGet2 | 607.5 | 605.9 | 1.002 | 15 | 15 | 0% | 1 | 1 | 0% |
| BenchmarkCache_ProductionLike | 94,983 | 63,739 | 1.49 | 917 | 1,001 | -9.2% | 25 | 33 | -32% |
Использование `sync.Map` приводит к значительному увеличению производительности по сравнению с использованием обычного `Map` в большинстве тестов. Особенно это заметно в тесте `BenchmarkCache_ConcurrentUpsertAndGet`, где время выполнения уменьшилось в 16.3 раза. Наблюдается сокращение использования памяти (B/op) и количество аллокаций (allocs/op) в некоторых тестах, хотя в одном случае количество аллокаций увеличилось.

94
cache.go

@ -27,10 +27,9 @@ var (
type cache struct {
ctx context.Context
items map[string]*cacheItem
mx *sync.RWMutex
expiredInterval time.Duration // интервал, через который GС удалит запись
runGCInterval time.Duration // интервал запуска GC
items sync.Map
expiredInterval time.Duration // Интервал, через который GС удалит запись
runGCInterval time.Duration // Интервал запуска GC
}
type cacheItem struct {
@ -39,8 +38,8 @@ type cacheItem struct {
cache *ttlcache.Cache
persistentCache *ttlcache.Cache
locks locks
refreshInterval time.Duration // интервал обновления кеша
expiredTime time.Time // время протухания кеша (когда GС удалит запись)
refreshInterval time.Duration // Интервал обновления кеша
expiredTime time.Time // Время протухания кеша (когда GС удалит запись)
}
func Cache() *cache {
@ -51,17 +50,11 @@ func Cache() *cache {
return &cacheCollection
}
// Upsert регистрируем новый/обновляем кеш (указываем фукнцию, кр будет возвращать нужное значение)
// Upsert регистрируем новый/обновляем кеш (указываем функцию, кр будет возвращать нужное значение)
func (c *cache) Upsert(key string, source func() (res interface{}, err error), refreshInterval time.Duration) (result interface{}, err error) {
c.mx.Lock()
defer c.mx.Unlock()
if c.items == nil {
c.items = map[string]*cacheItem{}
}
item, found := c.items[key]
if found {
actual, loaded := c.items.Load(key)
if loaded {
item := actual.(*cacheItem)
item.reader = source
item.refreshInterval = refreshInterval
result, err = c.updateCacheValue(key, item)
@ -79,43 +72,35 @@ func (c *cache) Upsert(key string, source func() (res interface{}, err error), r
cache: cache,
persistentCache: ttlcache.NewCache(),
locks: locks{
keys: map[string]bool{},
mx: &sync.RWMutex{},
keys: sync.Map{},
},
reader: source,
refreshInterval: refreshInterval,
expiredTime: expiredTime,
}
c.items[key] = &ci
c.items.Store(key, &ci)
result, err = c.updateCacheValue(key, &ci)
return result, err
}
// Delete удаляем значение из кеша
func (c *cache) Delete(key string) (err error) {
c.mx.Lock()
defer c.mx.Unlock()
delete(c.items, key)
return err
func (c *cache) Delete(key string) {
c.items.Delete(key)
}
// Get возвращает текущее значение параметра в сервисе keeper.
// Нужно учитывать, что значения на время кешируются и обновляются с заданной периодичностью.
func (c *cache) Get(key string) (value interface{}, err error) {
var item *cacheItem
var found bool
c.mx.RLock()
item, found = c.items[key]
c.mx.RUnlock()
if !found {
actual, loaded := c.items.Load(key)
if !loaded {
return nil, ErrorKeyNotFound
}
item, _ = actual.(*cacheItem)
if item.cache == nil {
return nil, ErrorCacheNotInit
}
@ -164,16 +149,14 @@ func (c *cache) updateCacheValue(key string, item *cacheItem) (result interface{
// tryToGetOldValue пытается получить старое значение, если в момент запроса на актуальном стоит блокировка.
func (c *cache) tryToGetOldValue(key string) (interface{}, error) {
var item *cacheItem
var found bool
c.mx.RLock()
item, found = c.items[key]
c.mx.RUnlock()
if !found {
actual, loaded := c.items.Load(key)
if !loaded {
return nil, fmt.Errorf("error. key is not found")
}
item = actual.(*cacheItem)
fnGetPersistentCacheValue := func() (interface{}, error) {
if cachedValue, ok := item.persistentCache.Get(key); ok {
return cachedValue, nil
@ -200,8 +183,7 @@ func (c *cache) tryToGetOldValue(key string) (interface{}, error) {
func Init(ctx context.Context, expiredInterval, runGCInterval time.Duration) {
d := cache{
ctx: ctx,
items: map[string]*cacheItem{},
mx: &sync.RWMutex{},
items: sync.Map{},
runGCInterval: runGCInterval,
expiredInterval: expiredInterval,
}
@ -214,23 +196,22 @@ func Init(ctx context.Context, expiredInterval, runGCInterval time.Duration) {
type locks struct {
// keys хранит информацию о локах по каждому отдельному ключу.
// Если значение установлено в true, в данный момент обновление кеша захвачено одной из горутин.
keys map[string]bool
mx *sync.RWMutex
keys sync.Map
}
// Get возвращает информацию о том идет ли в данный момент обновление конкретного ключа.
func (l *locks) Get(key string) bool {
l.mx.RLock()
defer l.mx.RUnlock()
value, ok := l.keys.Load(key)
if !ok {
return false
}
return l.keys[key]
return value.(bool)
}
// Set устанавливает блокировку на обновление конкретного ключа другими горутинами.
func (l *locks) Set(key string, value bool) {
l.mx.Lock()
l.keys[key] = value
l.mx.Unlock()
l.keys.Store(key, value)
}
// Balancer запускаем балансировщик реплик для сервисов
@ -262,18 +243,17 @@ func (c *cache) gc() {
}
func (c *cache) cleaner() (err error) {
c.mx.Lock()
defer c.mx.Unlock()
c.items.Range(func(key, value any) bool {
item, ok := value.(*cacheItem)
if !ok {
return true
}
// удаляем значение ключа из хранилища (чтобы не копились старые хеши)
for key, item := range c.items {
if item.expiredTime != time.UnixMicro(0) && !item.expiredTime.After(time.Now()) {
err = c.Delete(key)
if err != nil {
return err
}
c.items.Delete(key)
}
}
return err
return true
})
return nil
}

158
cache_test.go

@ -0,0 +1,158 @@
package cache
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
)
func BenchmarkCache_ConcurrentUpsertAndGet(b *testing.B) {
ctx := context.Background()
expiredInterval := 10 * time.Minute // Произвольно выбранный интервал истечения срока действия
runGCInterval := 1 * time.Minute // Произвольно выбранный интервал запуска сборщика мусора
// Инициализация кэша
Init(ctx, expiredInterval, runGCInterval)
c := Cache()
var wg sync.WaitGroup
// Количество операций для каждого типа (запись и чтение)
operationsCount := 100000
b.ResetTimer()
// Запуск горутин для записи
for i := 0; i < operationsCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
_, err := c.Upsert(key, func() (interface{}, error) {
return fmt.Sprintf("value%d", i), nil
}, 5*time.Minute) // Используем 5 минут как интервал обновления
if err != nil {
b.Errorf("Failed to upsert item: %s", err)
}
}(i)
}
// Запуск горутин для чтения
for i := 0; i < operationsCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
_, err := c.Get(key)
if err != nil && !errors.Is(err, ErrorKeyNotFound) {
b.Errorf("Failed to get item: %s", err)
}
}(i)
}
wg.Wait() // Ожидание завершения всех горутин
}
// BenchmarkCache_ConcurrentUpsert бенчмарк для многопоточной операции Upsert.
func BenchmarkCache_ConcurrentUpsert(b *testing.B) {
ctx := context.Background()
Init(ctx, 10*time.Minute, 1*time.Minute)
c := Cache()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err := c.Upsert(fmt.Sprintf("key%d", i), func() (interface{}, error) {
return fmt.Sprintf("value%d", i), nil
}, 5*time.Minute)
if err != nil {
b.Errorf("Failed to upsert item: %s", err)
}
}(i)
}
wg.Wait()
}
func BenchmarkCache_ConcurrentGet2(b *testing.B) {
ctx := context.Background()
Init(ctx, 10*time.Minute, 1*time.Minute)
c := Cache()
// Предварительно заполняем кэш данными для чтения
for i := 0; i < 100000; i++ {
key := fmt.Sprintf("key%d", i)
c.Upsert(key, func() (interface{}, error) {
return fmt.Sprintf("value%d", i), nil
}, 5*time.Minute)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := c.Get(fmt.Sprintf("key%d", rand.Intn(10000)))
if err != nil {
b.Errorf("Failed to get item: %s", err)
}
}
})
}
func BenchmarkCache_ProductionLike(b *testing.B) {
ctx := context.Background()
expiredInterval := 1 * time.Minute // Более короткий интервал истечения для тестирования GC
runGCInterval := 10 * time.Second // Частый запуск GC для имитации работы
Init(ctx, expiredInterval, runGCInterval)
c := Cache()
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
// Чтение данных
wg.Add(1)
go func() {
defer wg.Done()
key := fmt.Sprintf("key%d", rand.Intn(1000))
_, _ = c.Get(key)
}()
// Запись данных
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i%1000) // Ограничение количества уникальных ключей для имитации повторных записей
_, _ = c.Upsert(key, func() (interface{}, error) {
return fmt.Sprintf("value%d", i), nil
}, time.Duration(rand.Intn(5)+1)*time.Minute) // Рандомный интервал обновления
}(i)
// Обновление данных
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", rand.Intn(1000))
_, _ = c.Upsert(key, func() (interface{}, error) {
return fmt.Sprintf("newValue%d", i), nil
}, time.Duration(rand.Intn(5)+1)*time.Minute)
}(i)
// Удаление данных
if i%10 == 0 { // Не на каждой итерации, чтобы не очищать все сразу
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", rand.Intn(1000))
c.Delete(key)
}(i)
}
wg.Wait()
}
}
Loading…
Cancel
Save