You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
8.7 KiB

package cache
import (
"context"
"fmt"
"sync"
"time"
"git.lowcodeplatform.net/fabric/lib"
"git.lowcodeplatform.net/packages/logger"
"github.com/ReneKroon/ttlcache"
"go.uber.org/zap"
"github.com/pkg/errors"
)
const cacheKeyPrefix = "cache."
var ErrorCacheNotInit = errors.New("cache is not initialized")
var ErrorCachePersistentNotInit = errors.New("persistent cache is not initialized")
var ErrorKeyNotFound = errors.New("key is not found")
var ErrorValueNotCase = errors.New("failed to cast value")
var ErrorItemExpired = errors.New("item expired")
var (
cacheCollection cache
isCacheInit bool // Добавляем флаг инициализации
)
type cache struct {
ctx context.Context
items *sync.Map
expiredInterval time.Duration // Интервал, через который GС удалит запись
runGCInterval time.Duration // Интервал запуска GC
ttlcache *ttlcache.Cache
ttlPersistentCache *ttlcache.Cache
}
type cacheItem struct {
// Getter определяет механизм получения данных от любого источника к/р поддерживает интерфейс
reader func() (res interface{}, err error)
cache *ttlcache.Cache
persistentCache *ttlcache.Cache
locks locks
refreshInterval time.Duration // Интервал обновления кеша
expiredTime time.Time // Время протухания кеша (когда GС удалит запись)
}
// Init инициализировали глобальную переменную defaultCache
// expiredInterval - время жизни записи в кеше, после удаляется с GC (0 - не удаляется никогда)
func Init(ctx context.Context, expiredInterval, runGCInterval time.Duration) {
ttlcacheNonPer := ttlcache.NewCache()
ttlcacheNonPer.SkipTtlExtensionOnHit(true)
d := cache{
ctx: ctx,
items: &sync.Map{},
runGCInterval: runGCInterval,
expiredInterval: expiredInterval,
ttlcache: ttlcacheNonPer,
ttlPersistentCache: ttlcache.NewCache(),
}
cacheCollection = d
isCacheInit = true // Устанавливаем флаг при инициализации
go d.gc()
}
func Cache() *cache {
if !isCacheInit { // Используем флаг для проверки инициализации
return nil
}
return &cacheCollection
}
// Upsert регистрируем новый/обновляем кеш (указываем функцию, кр будет возвращать нужное значение)
func (c *cache) Upsert(key string, source func() (res interface{}, err error), refreshInterval time.Duration) (result interface{}, err error) {
actual, loaded := c.items.Load(key)
if loaded {
item, ok := actual.(*cacheItem)
if !ok {
return nil, ErrorValueNotCase
}
item.reader = source
item.refreshInterval = refreshInterval
result, err = c.updateCacheValue(key, item)
return result, err
}
expiredTime := time.Now().Add(c.expiredInterval)
if c.expiredInterval == 0 {
expiredTime = time.UnixMicro(0)
}
ci := cacheItem{
cache: c.ttlcache,
persistentCache: c.ttlPersistentCache,
locks: locks{
keys: sync.Map{},
},
reader: source,
refreshInterval: refreshInterval,
expiredTime: expiredTime,
}
c.items.Store(key, &ci)
result, err = c.updateCacheValue(key, &ci)
return result, err
}
// Delete удаляем значение из кеша
func (c *cache) Delete(key string) {
c.items.Delete(key)
}
// Get возвращает текущее значение параметра в сервисе keeper.
// Нужно учитывать, что значения на время кешируются и обновляются с заданной периодичностью.
func (c *cache) Get(key string) (value interface{}, err error) {
var item *cacheItem
actual, loaded := c.items.Load(key)
if !loaded {
return nil, ErrorKeyNotFound
}
item, ok := actual.(*cacheItem)
if !ok {
return nil, ErrorValueNotCase
}
if item.cache == nil {
return nil, ErrorCacheNotInit
}
if item.persistentCache == nil {
return nil, ErrorCachePersistentNotInit
}
if cachedValue, ok := item.cache.Get(key); ok {
return cachedValue, nil
}
// Если стоит блокировка, значит кто-то уже обновляет кеш. В этом случае
// пытаемся отдать предыдущее значение.
if item.locks.Get(key) {
return c.tryToGetOldValue(key)
}
// запускаем обновление фоном
go c.updateCacheValue(key, item)
// отдаем старое значение
return c.tryToGetOldValue(key)
}
// updateCacheValue обновление значений в кеше
// вариант 1 - значение не найдено. Первый из запросов блокирует за собой обновление (на самом деле
// может возникнуть ситуация когда несколько запросов поставят блокировку и начнут
// обновлять кеш - пока считаем это некритичным).
func (c *cache) updateCacheValue(key string, item *cacheItem) (result interface{}, err error) {
item.locks.Set(key, true)
defer item.locks.Set(key, false)
result, err = item.reader()
if err != nil {
return nil, errors.Wrap(err, "could not get value from getter")
}
item.cache.SetWithTTL(key, result, item.refreshInterval)
item.persistentCache.Set(key, result)
item.expiredTime = time.Now().Add(c.expiredInterval)
return result, nil
}
// tryToGetOldValue пытается получить старое значение, если в момент запроса на актуальном стоит блокировка.
func (c *cache) tryToGetOldValue(key string) (interface{}, error) {
var item *cacheItem
actual, loaded := c.items.Load(key)
if !loaded {
return nil, ErrorKeyNotFound
}
item, ok := actual.(*cacheItem)
if !ok {
return nil, ErrorValueNotCase
}
fnGetPersistentCacheValue := func() (interface{}, error) {
if cachedValue, ok := item.persistentCache.Get(key); ok {
return cachedValue, nil
}
return nil, fmt.Errorf("persinstent cache is empty")
}
oldValue, err := fnGetPersistentCacheValue()
// Повторяем попытку получить значение. При старте сервиса может возникнуть блокировка
// обновления ключа, но при этом в постоянном кеше еще может не быть значения.
if err != nil {
time.Sleep(100 * time.Millisecond)
oldValue, err = fnGetPersistentCacheValue()
}
return oldValue, err
}
// locks выполняет функции блокировки при одновременном обновлении значений в кеше.
type locks struct {
// keys хранит информацию о локах по каждому отдельному ключу.
// Если значение установлено в true, в данный момент обновление кеша захвачено одной из горутин.
keys sync.Map
}
// Get возвращает информацию о том идет ли в данный момент обновление конкретного ключа.
func (l *locks) Get(key string) bool {
value, ok := l.keys.Load(key)
if !ok {
return false
}
valueBool, ok := value.(bool)
if !ok {
return false
}
return valueBool
}
// Set устанавливает блокировку на обновление конкретного ключа другими горутинами.
func (l *locks) Set(key string, value bool) {
l.keys.Store(key, value)
}
// Balancer запускаем балансировщик реплик для сервисов
// опрашивает список реплик из пингов и если запущено меньше, добавляет реплику.
func (c *cache) gc() {
var err error
ticker := time.NewTicker(c.runGCInterval)
defer ticker.Stop()
defer func() {
lib.Recover(c.ctx)
if err != nil {
logger.Error(c.ctx, "error gc", zap.Error(err))
}
}()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
err = c.cleaner()
if err != nil {
logger.Error(c.ctx, "error deleted item (expired time)", zap.Error(err))
}
ticker.Reset(c.runGCInterval)
}
}
}
func (c *cache) cleaner() (err error) {
keys := []any{}
c.items.Range(func(key, value any) bool {
item, ok := value.(*cacheItem)
if !ok {
return false
}
if item.expiredTime.Before(time.Now()) {
keys = append(keys, key)
}
return true
})
for _, key := range keys {
c.items.Delete(key)
}
return nil
}