Files
cache_manager/manager/cache.go
T
2026-05-16 19:54:12 +08:00

711 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package manager
import (
"context"
"encoding/json"
"errors"
"fmt"
"slices"
"sync"
"time"
"github.com/bytedance/sonic"
"github.com/redis/go-redis/v9"
)
// 缓存适配层
// ICache 定义缓存的标准行为接口
// Get 方法的 value 参数必须为指针类型,用于反序列化结果。
// Set/SetLocal/SetRedis 方法 ttl <= 0 时,默认过期时间为 24 小时。
type ICache interface {
Set(ctx context.Context, key string, value any, ttl time.Duration, ids []int64) error
Get(ctx context.Context, key string, value any) error // value 必须为指针类型
Del(ctx context.Context, key string) error
Remove(ctx context.Context, ids []int64) error
BatchDel(ctx context.Context, keys []string) error
}
var (
ErrCacheNil = errors.New("cache value is nil")
)
// CacheRedisHash 以 Redis Hash 结构实现的缓存方案
// 实现原理:
// - 所有缓存数据存储在同一个 Redis HashkeyName)下,不同业务 key 作为 field。
// - 只支持整个 hash 的过期时间(通过 Expire 设置),不支持 field 级别的过期。
// - 适合 field 数量有限、生命周期一致的场景。
// - 若 field 很多且生命周期不一致,建议使用 CacheRedis。
// - 频繁访问某个 field 会导致整个 hash 过期时间被刷新,其他 field 也不会过期,需注意内存风险。
//
// 适用场景:
// - 业务 key 数量有限,生命周期一致,且需要批量操作 hash 的场景。
type CacheRedisHash struct {
options *managerOptions
redisClient redis.UniversalClient
keyName string
hashTableName string // 记录Key=>ids 映射关系的Hash表,field为keyvalue为ids的json字符串
ttl time.Duration
}
// NewCacheRedisHash 创建新的 Redis Hash 缓存实例
// ttl <= 0 时,默认过期时间为 24 小时
// keyName 是 Redis Hash 的 key,实际缓存项存储在该 Hash 中,field 为具体的缓存 key
// 适用于需要在 Redis 中存储大量相关缓存项的场景,避免过多的 Redis key 导致性能问题
// 这种 hash 结构适合 field 数量有限、生命周期一致的场景。如果 field 很多且生命周期不一致,建议直接用 string 类型的 key-value。
// 例如,可以存储配置项等,一直不需要过期的数据。
func NewCacheRedisHash(redis redis.UniversalClient, keyName string, ttl time.Duration, ops ...OptionFunc) *CacheRedisHash {
options := defaultManagerOptions()
for _, op := range ops {
op(options)
}
if ttl <= 0 {
ttl = 24 * time.Hour
}
return &CacheRedisHash{
options: options,
redisClient: redis,
keyName: keyName,
hashTableName: keyName + ":hashTable",
ttl: ttl,
}
}
func (l *CacheRedisHash) Get(ctx context.Context, key string, value any) error {
val, err := l.redisClient.HGet(ctx, l.keyName, key).Result()
if err != nil {
if err == redis.Nil {
return ErrCacheNil // 明确区分缓存未命中和其他错误
}
l.options.logger.Errorf(ctx, "cache redis get error: %v", err)
return err
}
err = sonic.Unmarshal([]byte(val), value)
if err != nil {
l.options.logger.Errorf(ctx, "cache redis unmarshal error: %v", err)
return err
}
l.redisClient.Expire(ctx, l.keyName, l.ttl)
return nil
}
func (r *CacheRedisHash) Set(ctx context.Context, key string, value any, ttl time.Duration, ids []int64) error {
jsonBy, err := sonic.Marshal(value)
if err != nil {
r.options.logger.Errorf(ctx, "cache redis marshal error: %v", err)
return err
}
if ttl <= 0 {
ttl = r.ttl // 默认过期时间
}
idsBy, err := sonic.Marshal(ids)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Set redis marshal ids key:%s error: %v", key, err)
return err
}
p := r.redisClient.Pipeline()
p.HSet(ctx, r.keyName, key, jsonBy)
p.Expire(ctx, r.keyName, ttl) // 设置整个 hash 的过期时间,注意这会影响到 hash 中的所有 field
p.HSet(ctx, r.hashTableName, key, idsBy)
p.Expire(ctx, r.hashTableName, ttl)
cmders, err := p.Exec(ctx)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Set redis pipeline exec key:%s error: %v", key, err)
return err
}
// 检查 pipeline 中每个命令的错误
for i, cmd := range cmders {
if err := cmd.Err(); err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Set redis pipeline cmd[%d] error: %v", i, err)
return err
}
}
return nil
}
func (r *CacheRedisHash) Del(ctx context.Context, key string) error {
p := r.redisClient.Pipeline()
p.HDel(ctx, r.keyName, key)
p.HDel(ctx, r.hashTableName, key)
cmders, err := p.Exec(ctx)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Del redis pipeline exec key:%s error: %v", key, err)
return err
}
// 检查 pipeline 中每个命令的错误
for i, cmd := range cmders {
if err := cmd.Err(); err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Del redis pipeline cmd[%d] error: %v", i, err)
return err
}
}
return nil
}
func (r *CacheRedisHash) Remove(ctx context.Context, ids []int64) error {
// 获取所有 field=>ids 映射关系
hashTable, err := r.redisClient.HGetAll(ctx, r.hashTableName).Result()
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Remove redis get keyName:%s error: %v", r.hashTableName, err)
return err
}
for k, v := range hashTable {
var inIds []int64
err = sonic.Unmarshal([]byte(v), &inIds)
if err != nil {
continue
}
for _, id := range ids {
if slices.Contains(inIds, id) {
if err := r.Del(ctx, k); err != nil {
r.options.logger.Errorf(ctx, "CacheRedisHash Remove Del key:%s error: %v", k, err)
}
break
}
}
}
return nil
}
func (r *CacheRedisHash) BatchDel(ctx context.Context, keys []string) error {
if len(keys) == 0 {
return nil
}
err := r.redisClient.HDel(ctx, r.keyName, keys...).Err()
if err != nil {
r.options.logger.Errorf(ctx, "cache redis batch del error: %v", err)
return err
}
return nil
}
func BatchGetRedisHash[T any](r *CacheRedisHash, ctx context.Context, keys []string) (map[string]*T, error) {
if len(keys) == 0 { // 批量获取的key为空时,返回空map
return map[string]*T{}, nil
}
res, err := r.redisClient.HMGet(ctx, r.keyName, keys...).Result()
if err != nil && !errors.Is(err, redis.Nil) {
r.options.logger.Errorf(ctx, "cache redis get error: %v", err)
return nil, err
}
r.options.logger.Infof(ctx, "cache redis get keyName:%s keys: %v len:%d", r.keyName, keys, len(res))
result := make(map[string]*T)
for i, v := range res {
if v == nil {
continue
}
var t T
err := sonic.Unmarshal([]byte(v.(string)), &t)
if err != nil {
r.options.logger.Errorf(ctx, "cache redis unmarshal error: %v", err)
return nil, err
}
result[keys[i]] = &t
}
return result, nil
}
// CacheRedis 以 Redis String 结构实现的缓存方案
// 实现原理:
// - 每个缓存 key 独立存储为一个 Redis String,支持单独设置过期时间。
// - 通过 prefix 区分不同业务,避免 key 冲突。
// - 支持泛型批量获取(BatchGetRedis),底层用 pipeline 提高性能。
// - 适合 key 数量较多、生命周期不一致的场景。
//
// 适用场景:
// - 业务 key 数量较多,生命周期不一致,或需要单独控制每个 key 过期时间。
type CacheRedis struct {
options *managerOptions
redisClient redis.UniversalClient
prefix string // key前缀,避免不同业务之间的key冲突
hashTableName string // 记录Key=>ids 映射关系的Hash表,field为keyvalue为ids的json字符串
sortSetName string // 记录key的过期时间的有序集合,number为过期的毫秒时间戳,value为key
}
// NewCacheRedis 创建新的redis缓存实例
func NewCacheRedis(ctx context.Context, redis redis.UniversalClient, prefix string, ops ...OptionFunc) *CacheRedis {
options := defaultManagerOptions()
for _, op := range ops {
op(options)
}
r := &CacheRedis{
redisClient: redis,
prefix: prefix,
hashTableName: fmt.Sprintf("%s:hashTable", prefix),
sortSetName: fmt.Sprintf("%s:sortSet", prefix),
options: options,
}
go func() {
ticker := time.NewTicker(time.Hour) // 定时检查过期key
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.clean(ctx)
}
}
}()
return r
}
func (r *CacheRedis) clean(ctx context.Context) error {
// 从sortSet查询已过期的key,并删除
now := time.Now().UnixMilli()
expiredKeys, err := r.redisClient.ZRangeByScore(ctx, r.sortSetName, &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", now),
}).Result()
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedis clean redis ZRangeByScore keyName:%s error: %v", r.sortSetName, err)
return err
}
for _, key := range expiredKeys {
r.Del(ctx, key)
}
return nil
}
func (r *CacheRedis) Set(ctx context.Context, key string, value any, ttl time.Duration, ids []int64) error {
key = fmt.Sprintf("%s:%s", r.prefix, key)
// global.Logger.Infof(ctx, "cache redis set key: %s", key)
jsonBy, err := sonic.Marshal(value)
if err != nil {
r.options.logger.Errorf(ctx, "cache redis marshal error: %v", err)
return err
}
if ttl <= 0 {
ttl = time.Hour * 24 // 默认过期时间
}
idsBy, err := sonic.Marshal(ids)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedis Set redis marshal ids key:%s error: %v", key, err)
return err
}
p := r.redisClient.Pipeline()
p.Set(ctx, key, jsonBy, ttl)
p.ZAdd(ctx, r.sortSetName, redis.Z{
Score: float64(time.Now().Add(ttl).UnixMilli()),
Member: key,
})
p.HSet(ctx, r.hashTableName, key, idsBy)
p.Expire(ctx, r.hashTableName, time.Hour*24*30) // 一个月
cmders, err := p.Exec(ctx)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedis Set redis pipeline exec key:%s error: %v", key, err)
return err
}
// 检查 pipeline 中每个命令的错误
for i, cmd := range cmders {
if err := cmd.Err(); err != nil {
r.options.logger.Errorf(ctx, "CacheRedis Set redis pipeline cmd[%d] error: %v", i, err)
return err
}
}
return nil
}
func (r *CacheRedis) Get(ctx context.Context, key string, value any) error {
key = fmt.Sprintf("%s:%s", r.prefix, key)
val, err := r.redisClient.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return ErrCacheNil // 明确区分缓存未命中和其他错误
}
r.options.logger.Errorf(ctx, "cache redis get error: %v", err)
return err
}
err = sonic.Unmarshal([]byte(val), value)
if err != nil {
r.options.logger.Errorf(ctx, "cache redis unmarshal error: %v", err)
return err
}
return nil
}
func (l *CacheRedis) Remove(ctx context.Context, ids []int64) error {
// 获取所有 field=>ids 映射关系
hashTable, err := l.redisClient.HGetAll(ctx, l.hashTableName).Result()
if err != nil {
l.options.logger.Errorf(ctx, "CacheRedis Remove redis get keyName:%s error: %v", l.hashTableName, err)
return err
}
for k, v := range hashTable {
var inIds []int64
err = json.Unmarshal([]byte(v), &inIds)
if err != nil {
continue
}
for _, id := range ids {
if slices.Contains(inIds, id) {
if err := l.Del(ctx, k); err != nil {
l.options.logger.Errorf(ctx, "CacheRedis Remove Del key:%s error: %v", k, err)
}
break
}
}
}
return nil
}
// BatchGetRedis 批量获取Redis缓存,支持泛型
func BatchGetRedis[T any](r *CacheRedis, ctx context.Context, keys []string) (map[string]*T, error) {
result := make(map[string]*T)
if len(keys) == 0 {
return result, nil
}
keyMap := make(map[string]string, len(keys))
redisKeys := make([]string, 0, len(keys))
cmder, err := r.redisClient.Pipeline().Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, key := range keys {
rediskey := fmt.Sprintf("%s:%s", r.prefix, key)
redisKeys = append(redisKeys, rediskey)
keyMap[rediskey] = key
// global.Logger.Infof(ctx, "cache redis get key: %s", key)
pipe.Get(ctx, rediskey)
}
return nil
})
if err != nil && !errors.Is(err, redis.Nil) {
r.options.logger.Errorf(ctx, "BatchGetRedis pipeline exec error: %v", err)
return nil, err
}
for i, cmd := range cmder {
stringCom, ok := cmd.(*redis.StringCmd)
// r.options.logger.Infof(ctx, "cache redis get key: %s ok:%+v val:%+v", redisKeys[i], ok, stringCom)
if !ok {
continue
}
if stringCom.Err() != nil {
continue
}
val := stringCom.Val()
key := keyMap[redisKeys[i]]
var t T
err := sonic.Unmarshal([]byte(val), &t)
if err != nil {
// 反序列化失败时跳过该项,整体不报错
continue
}
result[key] = &t
}
return result, nil
}
// CacheRedis 删除缓存 保证Key删除的幂等性和一致性
func (r *CacheRedis) Del(ctx context.Context, key string) error {
key = fmt.Sprintf("%s:%s", r.prefix, key)
p := r.redisClient.Pipeline()
// 删除key
p.Del(ctx, key)
// 从过期时间的有序集合中删除
p.ZRem(ctx, r.sortSetName, key)
// 从hashTable中删除
p.HDel(ctx, r.hashTableName, key)
cmders, err := p.Exec(ctx)
if err != nil {
r.options.logger.Errorf(ctx, "CacheRedis Del redis pipeline exec key:%s error: %v", key, err)
return err
}
// 检查 pipeline 中每个命令的错误
for i, cmd := range cmders {
if err := cmd.Err(); err != nil {
r.options.logger.Errorf(ctx, "CacheRedis Del redis pipeline cmd[%d] error: %v", i, err)
return err
}
}
r.options.logger.Infof(ctx, "CacheRedis Del redis del key:%s", key)
return nil
}
func (r *CacheRedis) BatchDel(ctx context.Context, keys []string) error {
if len(keys) == 0 {
return nil
}
redisKeys := make([]string, 0, len(keys))
for _, key := range keys {
redisKeys = append(redisKeys, fmt.Sprintf("%s:%s", r.prefix, key))
}
//global.Logger.Infof(ctx, "CacheRedis BatchDel redis del keys:%+v", redisKeys)
err := r.redisClient.Del(ctx, redisKeys...).Err()
if err != nil {
r.options.logger.Errorf(ctx, "cache redis batch del error: %v", err)
return err
}
return nil
}
// CacheLocal 本地内存缓存实现
// 实现原理:
// - 使用 map[string]item 存储所有缓存,item 结构体包含值和过期时间。
// - 通过互斥锁保证并发安全。
// - 启动后台协程定期清理过期 key。
// - 适合单机场景,缓存容量受限于本地内存。
//
// 适用场景:
// - 适用于高性能、低延迟、单机进程内缓存需求,如热点数据、本地会话等。
// - 不适合分布式场景或大容量缓存。
type CacheLocal struct {
options *managerOptions
cacheMap sync.Map
keyTableMap sync.Map // 记录Key=>ids 映射关系的Mapfield为keyvalue为ids的数组
quit chan struct{} // 用于停止后台清理协程
}
// item 内部存储单元,包含值和过期时间
type item struct {
value any
expiry int64 // 过期时间戳 (Unix Nano)
}
func NewCacheLocal(cleanupInterval time.Duration, ops ...OptionFunc) *CacheLocal {
options := defaultManagerOptions()
for _, op := range ops {
op(options)
}
cache := &CacheLocal{
options: options,
cacheMap: sync.Map{},
keyTableMap: sync.Map{},
quit: make(chan struct{}),
}
// 启动后台清理协程
go func() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cache.cleanup(context.Background())
case <-cache.quit:
return
}
}
}()
return cache
}
// cleanup 定期清理过期的 Key
func (c *CacheLocal) cleanup(ctx context.Context) {
now := time.Now().UnixNano()
c.cacheMap.Range(func(key, value any) bool {
it, ok := value.(item)
if !ok {
return true
}
if it.expiry > 0 && now > it.expiry {
c.cacheMap.Delete(key) // 删除过时的缓存项
}
return true
})
c.options.logger.Infof(ctx, "CacheLocal cleanup")
return
}
// Stop 停止后台清理协程(通常在程序退出时调用)
func (c *CacheLocal) Stop() {
select {
case <-c.quit:
// 已关闭
return
default:
close(c.quit)
}
}
// Set 将 value 存储为 interface{},建议 value 为可安全复制的类型
func (l *CacheLocal) Set(ctx context.Context, key string, value any, ttl time.Duration, ids []int64) error {
if ttl <= 0 {
ttl = time.Hour * 24 // 默认过期时间
}
// 存储深拷贝,防止外部修改
var v any
switch vv := value.(type) {
case nil:
v = nil
case string, int, int64, float64, bool:
v = vv
default:
// 对于结构体、切片、map等,序列化再反序列化实现深拷贝
by, err := sonic.Marshal(vv)
if err != nil {
return err
}
err = sonic.Unmarshal(by, &v)
if err != nil {
return err
}
}
l.cacheMap.Store(key, item{v, time.Now().Add(ttl).UnixNano()})
l.keyTableMap.Store(key, ids)
return nil
}
// Get 直接类型断言赋值,提升类型安全
func (l *CacheLocal) Get(ctx context.Context, key string, value any) error {
itemData, exists := l.cacheMap.Load(key)
if !exists {
return ErrCacheNil // 不存在,返回nil表示缓存未命中
}
item, ok := itemData.(item)
if !ok {
return ErrCacheNil // 类型断言失败,返回nil表示缓存未命中
}
if time.Now().UnixNano() > item.expiry {
l.cacheMap.Delete(key) // 删除过期的缓存
return ErrCacheNil
}
// 直接类型断言赋值
switch v := value.(type) {
case *string:
vv, ok := item.value.(string)
if !ok {
return errors.New("cache type mismatch: expect string")
}
*v = vv
case *int:
vv, ok := item.value.(int)
if !ok {
return errors.New("cache type mismatch: expect int")
}
*v = vv
case *int64:
vv, ok := item.value.(int64)
if !ok {
return errors.New("cache type mismatch: expect int64")
}
*v = vv
case *float64:
vv, ok := item.value.(float64)
if !ok {
return errors.New("cache type mismatch: expect float64")
}
*v = vv
case *bool:
vv, ok := item.value.(bool)
if !ok {
return errors.New("cache type mismatch: expect bool")
}
*v = vv
default:
// 结构体、切片、map等,序列化再反序列化
by, err := sonic.Marshal(item.value)
if err != nil {
return err
}
err = sonic.Unmarshal(by, value)
if err != nil {
return err
}
}
return nil
}
// BatchGetLocal 批量获取本地缓存,支持泛型
func BatchGetLocal[T any](c *CacheLocal, ctx context.Context, keys []string) (map[string]*T, error) {
result := make(map[string]*T)
for _, key := range keys {
var t T
err := c.Get(ctx, key, &t)
if err != nil && !errors.Is(err, ErrCacheNil) {
return nil, err
}
if errors.Is(err, ErrCacheNil) {
continue
}
result[key] = &t
}
return result, nil
}
func (l *CacheLocal) BatchDel(ctx context.Context, keys []string) error {
return nil
}
func (l *CacheLocal) Del(ctx context.Context, key string) error {
l.cacheMap.Delete(key) // 删除指定的缓存
l.keyTableMap.Delete(key)
return nil
}
func (l *CacheLocal) Remove(ctx context.Context, ids []int64) error {
l.keyTableMap.Range(func(key, value any) bool {
idsArr, ok := value.([]int64)
if !ok {
return true
}
keyStr, ok := key.(string)
if !ok {
return true
}
for _, id := range idsArr {
for _, id2 := range ids {
if id == id2 {
l.Del(ctx, keyStr)
}
}
}
return true
})
return nil
}