在现代分布式应用架构中,缓存已成为提升系统性能和用户体验的关键技术组件。随着业务规模的不断扩大和并发量的持续增长,单一级别的缓存往往无法满足复杂的性能需求。多级缓存架构通过在不同层次构建缓存体系,能够显著提升数据访问效率,降低数据库负载,并提供更好的系统可扩展性。
本文将深入探讨C#环境下多级缓存的架构设计与实现,重点分析内存缓存(Memory Cache)与Redis分布式缓存的协同工作机制,并详细阐述如何通过Redis的发布-订阅(Pub/Sub)模式实现不同节点间的缓存状态同步。
缓存的本质是利用时间局部性(Temporal Locality)和空间局部性(Spatial Locality)原理,将频繁访问的数据存储在更快的存储介质中。在计算机系统中,从CPU缓存到内存,从内存到磁盘,都遵循着这种层次化的存储架构。
CPU Cache (L1/L2/L3) → Memory → Disk Storage
↑ ↑ ↑
快速访问 中等速度 慢速访问
小容量 中等容量 大容量
昂贵 适中 便宜
在应用层面,多级缓存同样遵循类似的原理:
根据CAP定理(Consistency, Availability, Partition tolerance),在分布式缓存系统中,我们无法同时保证:
在实际应用中,我们通常采用最终一致性(Eventually Consistency)模型,通过合理的同步策略和过期机制来平衡性能与一致性。
缓存穿透(Cache Penetration):
缓存击穿(Cache Breakdown):
缓存雪崩(Cache Avalanche):
graph TB
Client[客户端应用]
subgraph "多级缓存系统"
subgraph "应用层"
Controller[Controller层]
Service[Service层]
Manager[多级缓存管理器]
end
subgraph "缓存层"
subgraph "L1缓存 - 内存缓存"
MemCache[IMemoryCache]
AdvMemCache[高级内存缓存]
end
subgraph "L2缓存 - Redis分布式缓存"
RedisConn[Redis连接管理器]
RedisCache[Redis分布式缓存]
RedisCluster[Redis集群]
end
end
subgraph "同步机制"
PubSub[Redis Pub/Sub]
SyncService[缓存同步服务]
EventQueue[事件队列]
end
subgraph "监控系统"
Monitor[监控服务]
HealthCheck[健康检查]
Metrics[性能指标]
end
end
subgraph "数据层"
Database[(数据库)]
External[外部API]
end
Client --> Controller
Controller --> Service
Service --> Manager
Manager --> AdvMemCache
Manager --> RedisCache
AdvMemCache --> MemCache
RedisCache --> RedisConn
RedisConn --> RedisCluster
RedisCache --> PubSub
PubSub --> SyncService
SyncService --> EventQueue
EventQueue --> AdvMemCache
Manager --> Monitor
Monitor --> HealthCheck
Monitor --> Metrics
Manager -.-> Database
Manager -.-> External
style Manager fill:#e1f5fe
style AdvMemCache fill:#f3e5f5
style RedisCache fill:#e8f5e8
style PubSub fill:#fff3e0
sequenceDiagram
participant Client as 客户端
participant Manager as 多级缓存管理器
participant L1 as L1内存缓存
participant L2 as L2 Redis缓存
participant Sync as 同步服务
participant DB as 数据库
Note over Client,DB: 读取操作流程 (GetOrSet)
Client->>Manager: GetOrSet(key, factory)
Manager->>L1: Get(key)
alt L1 缓存命中
L1->>Manager: 返回数据
Manager->>Client: 返回结果 (L1 Hit)
else L1 缓存未命中
L1->>Manager: 返回 null
Manager->>L2: Get(key)
alt L2 缓存命中
L2->>Manager: 返回数据
Manager->>L1: Set(key, data) [异步提升]
Manager->>Client: 返回结果 (L2 Hit)
else L2 缓存未命中
L2->>Manager: 返回 null
Manager->>DB: factory() 执行
DB->>Manager: 返回原始数据
par 并行设置缓存
Manager->>L1: Set(key, data)
Manager->>L2: Set(key, data)
and 发布同步事件
Manager->>Sync: PublishSync(SET, key)
end
Manager->>Client: 返回结果 (DB Hit)
end
end
Note over Client,DB: 更新操作流程
Client->>Manager: Set(key, data)
par 并行更新所有级别
Manager->>L1: Set(key, data)
Manager->>L2: Set(key, data)
end
Manager->>Sync: PublishSync(SET, key)
Sync->>L1: 通知其他节点更新L1缓存
Manager->>Client: 返回结果
Note over Client,DB: 删除操作流程
Client->>Manager: Remove(key)
par 并行删除所有级别
Manager->>L1: Remove(key)
Manager->>L2: Remove(key)
end
Manager->>Sync: PublishSync(REMOVE, key)
Sync->>L1: 通知其他节点删除L1缓存
Manager->>Client: 返回结果
graph TB
subgraph "节点A"
AppA[应用A]
L1A[L1缓存A]
L2A[L2缓存A]
SyncA[同步服务A]
end
subgraph "节点B"
AppB[应用B]
L1B[L1缓存B]
L2B[L2缓存B]
SyncB[同步服务B]
end
subgraph "节点C"
AppC[应用C]
L1C[L1缓存C]
L2C[L2缓存C]
SyncC[同步服务C]
end
subgraph "Redis集群"
RedisPubSub[Redis Pub/Sub频道<br/>cache_sync:events]
end
%% 数据更新流程
AppA -->|1. 更新数据| L1A
AppA -->|2. 更新数据| L2A
SyncA -->|3. 发布同步事件| RedisPubSub
%% 同步通知
RedisPubSub -->|4. 广播事件| SyncB
RedisPubSub -->|4. 广播事件| SyncC
%% 本地缓存更新
SyncB -->|5. 更新本地缓存| L1B
SyncC -->|5. 更新本地缓存| L1C
style RedisPubSub fill:#ff9999
style SyncA fill:#99ccff
style SyncB fill:#99ccff
style SyncC fill:#99ccff
flowchart TD
Start([开始缓存操作]) --> CheckL1{L1缓存<br/>是否可用?}
CheckL1 -->|是| TryL1[尝试L1缓存操作]
TryL1 --> L1Success{L1操作<br/>成功?}
L1Success -->|是| ReturnL1[返回L1结果]
L1Success -->|否| CheckL2
CheckL1 -->|否| CheckL2{L2缓存<br/>是否可用?}
CheckL2 -->|是| TryL2[尝试L2缓存操作]
TryL2 --> L2Success{L2操作<br/>成功?}
L2Success -->|是| ReturnL2[返回L2结果]
L2Success -->|否| CheckStrategy
CheckL2 -->|否| CheckStrategy{降级策略}
CheckStrategy -->|L1Only| L1OnlyMode[仅使用L1缓存]
CheckStrategy -->|DirectAccess| DirectDB[直接访问数据库]
CheckStrategy -->|ThrowException| ThrowError[抛出异常]
L1OnlyMode --> TryL1Only[尝试L1操作]
TryL1Only --> L1OnlySuccess{成功?}
L1OnlySuccess -->|是| ReturnL1Only[返回L1结果]
L1OnlySuccess -->|否| DirectDB
DirectDB --> DBAccess[执行数据库查询]
DBAccess --> ReturnDB[返回数据库结果]
ThrowError --> ErrorReturn[返回错误]
ReturnL1 --> End([结束])
ReturnL2 --> End
ReturnL1Only --> End
ReturnDB --> End
ErrorReturn --> End
style CheckL1 fill:#e3f2fd
style CheckL2 fill:#e3f2fd
style CheckStrategy fill:#fff3e0
style DirectDB fill:#ffebee
style ThrowError fill:#ffcdd2
多级缓存架构采用分层设计模式,每一层都有明确的职责和边界:
┌─────────────────────────────────────────────────────┐
│ 应用层 │
├─────────────────────────────────────────────────────┤
│ 多级缓存管理器 │
├─────────────────┬───────────────────────────────────┤
│ L1内存缓存 │ L2 Redis缓存 │
│ (MemoryCache) │ (StackExchange.Redis) │
├─────────────────┴───────────────────────────────────┤
│ Redis Pub/Sub 同步机制 │
├─────────────────────────────────────────────────────┤
│ 数据持久层 │
└─────────────────────────────────────────────────────┘
Microsoft.Extensions.Caching.Memory:
System.Runtime.Caching.MemoryCache:
StackExchange.Redis:
ServiceStack.Redis:
这是最常用的缓存模式,应用程序负责管理缓存的读取和更新:
读取流程:
1. 应用程序尝试从缓存读取数据
2. 如果缓存命中,直接返回数据
3. 如果缓存未命中,从数据库读取数据
4. 将数据写入缓存,然后返回给应用程序
更新流程:
1. 更新数据库
2. 删除或更新缓存中的对应数据
写入流程:
1. 应用程序写入缓存
2. 缓存服务同步写入数据库
3. 确认写入完成后返回成功
写入流程:
1. 应用程序写入缓存
2. 立即返回成功
3. 缓存服务异步批量写入数据库
Microsoft.Extensions.Caching.Memory.IMemoryCache接口提供了缓存操作的核心方法:
public interface IMemoryCache : IDisposable
{
bool TryGetValue(object key, out object value);
ICacheEntry CreateEntry(object key);
void Remove(object key);
}
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Text.RegularExpressions;
using System.Runtime.Serialization;
/// <summary>
/// 缓存异常基类
/// </summary>
publicabstractclassCacheException : Exception
{
protected CacheException(string message) : base(message) { }
protected CacheException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 缓存连接异常
/// </summary>
publicclassCacheConnectionException : CacheException
{
public CacheConnectionException(string message) : base(message) { }
public CacheConnectionException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 缓存序列化异常
/// </summary>
publicclassCacheSerializationException : CacheException
{
public CacheSerializationException(string message) : base(message) { }
public CacheSerializationException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 缓存超时异常
/// </summary>
publicclassCacheTimeoutException : CacheException
{
public CacheTimeoutException(string message) : base(message) { }
public CacheTimeoutException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 缓存验证异常
/// </summary>
publicclassCacheValidationException : CacheException
{
public CacheValidationException(string message) : base(message) { }
public CacheValidationException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 线程安全的缓存统计追踪器
/// </summary>
publicclassCacheStatisticsTracker
{
privatelong _totalOperations = 0;
privatelong _l1Hits = 0;
privatelong _l2Hits = 0;
privatelong _totalMisses = 0;
privatereadonlyobject _lock = newobject();
public void RecordOperation()
{
Interlocked.Increment(ref _totalOperations);
}
public void RecordHit(CacheLevel level)
{
switch (level)
{
case CacheLevel.L1:
Interlocked.Increment(ref _l1Hits);
break;
case CacheLevel.L2:
Interlocked.Increment(ref _l2Hits);
break;
}
}
public void RecordMiss()
{
Interlocked.Increment(ref _totalMisses);
}
public CacheStatisticsSnapshot GetSnapshot()
{
returnnew CacheStatisticsSnapshot
{
TotalOperations = Interlocked.Read(ref _totalOperations),
L1Hits = Interlocked.Read(ref _l1Hits),
L2Hits = Interlocked.Read(ref _l2Hits),
TotalMisses = Interlocked.Read(ref _totalMisses)
};
}
public void Reset()
{
lock (_lock)
{
Interlocked.Exchange(ref _totalOperations, 0);
Interlocked.Exchange(ref _l1Hits, 0);
Interlocked.Exchange(ref _l2Hits, 0);
Interlocked.Exchange(ref _totalMisses, 0);
}
}
}
/// <summary>
/// 缓存统计快照
/// </summary>
publicclassCacheStatisticsSnapshot
{
publiclong TotalOperations { get; init; }
publiclong L1Hits { get; init; }
publiclong L2Hits { get; init; }
publiclong TotalMisses { get; init; }
publiclong TotalHits => L1Hits + L2Hits;
publicdouble OverallHitRatio => TotalOperations == 0 ? 0 : (double)TotalHits / TotalOperations;
publicdouble L1HitRatio => TotalOperations == 0 ? 0 : (double)L1Hits / TotalOperations;
publicdouble L2HitRatio => TotalOperations == 0 ? 0 : (double)L2Hits / TotalOperations;
}
/// <summary>
/// 缓存数据验证器接口
/// </summary>
publicinterfaceICacheDataValidator
{
bool IsValid<T>(T value);
void ValidateKey(string key);
bool IsSafeForSerialization<T>(T value);
}
/// <summary>
/// 默认缓存数据验证器
/// </summary>
publicclassDefaultCacheDataValidator : ICacheDataValidator
{
privatereadonly ILogger<DefaultCacheDataValidator> _logger;
privatereadonly HashSet<Type> _forbiddenTypes;
privatereadonly Regex _keyValidationRegex;
public DefaultCacheDataValidator(ILogger<DefaultCacheDataValidator> logger)
{
_logger = logger;
_forbiddenTypes = new HashSet<Type>
{
typeof(System.IO.FileStream),
typeof(System.Net.Sockets.Socket),
typeof(System.Threading.Thread),
typeof(System.Threading.Tasks.Task)
};
// 限制key格式:只允许字母数字下划线冒号和点
_keyValidationRegex = new Regex(@"^[a-zA-Z0-9_:.-]+$", RegexOptions.Compiled);
}
publicbool IsValid<T>(T value)
{
if (value == null) returntrue;
var valueType = value.GetType();
// 检查禁止类型
if (_forbiddenTypes.Contains(valueType))
{
_logger.LogWarning("Forbidden type in cache: {Type}", valueType.Name);
returnfalse;
}
// 检查循环引用(简化版)
if (HasCircularReference(value))
{
_logger.LogWarning("Circular reference detected in cache value");
returnfalse;
}
returntrue;
}
public void ValidateKey(string key)
{
if (string.IsNullOrWhiteSpace(key))
thrownew CacheValidationException("Cache key cannot be null or empty");
if (key.Length > 250)
thrownew CacheValidationException($"Cache key too long: {key.Length} characters");
if (!_keyValidationRegex.IsMatch(key))
thrownew CacheValidationException($"Invalid characters in cache key: {key}");
}
publicbool IsSafeForSerialization<T>(T value)
{
if (value == null) returntrue;
var valueType = value.GetType();
// 检查是否有序列化属性
if (valueType.IsSerializable ||
valueType.GetCustomAttributes(typeof(DataContractAttribute), false).Length > 0)
{
returntrue;
}
// 原始类型和字符串通常安全
return valueType.IsPrimitive || valueType == typeof(string) || valueType == typeof(DateTime);
}
private bool HasCircularReference(object obj, HashSet<object> visited = null)
{
if (obj == null) returnfalse;
visited ??= new HashSet<object>();
if (visited.Contains(obj))
returntrue;
visited.Add(obj);
// 简化的循环检测,只检查一层
var type = obj.GetType();
if (type.IsPrimitive || type == typeof(string))
returnfalse;
visited.Remove(obj);
returnfalse;
}
}
/// <summary>
/// 安全缓存管理器装饰器
/// </summary>
publicclassSecureCacheManagerDecorator : IAdvancedMemoryCache
{
privatereadonly IAdvancedMemoryCache _innerCache;
privatereadonly ICacheDataValidator _validator;
privatereadonly ILogger<SecureCacheManagerDecorator> _logger;
public SecureCacheManagerDecorator(
IAdvancedMemoryCache innerCache,
ICacheDataValidator validator,
ILogger<SecureCacheManagerDecorator> logger)
{
_innerCache = innerCache ?? thrownew ArgumentNullException(nameof(innerCache));
_validator = validator ?? thrownew ArgumentNullException(nameof(validator));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
}
publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
_validator.ValidateKey(key);
returnawait _innerCache.GetOrSetAsync(key, async () =>
{
varvalue = await factory();
if (!_validator.IsValid(value))
{
thrownew CacheValidationException($"Invalid cache value for key: {key}");
}
returnvalue;
}, expiry);
}
publicasync Task<T> GetAsync<T>(string key)
{
_validator.ValidateKey(key);
returnawait _innerCache.GetAsync<T>(key);
}
publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
_validator.ValidateKey(key);
if (!_validator.IsValid(value))
{
thrownew CacheValidationException($"Invalid cache value for key: {key}");
}
if (!_validator.IsSafeForSerialization(value))
{
_logger.LogWarning("Potentially unsafe serialization for key: {Key}, type: {Type}",
key, value?.GetType().Name);
}
await _innerCache.SetAsync(key, value, expiry);
}
public async Task RemoveAsync(string key)
{
_validator.ValidateKey(key);
await _innerCache.RemoveAsync(key);
}
public async Task RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrWhiteSpace(pattern))
thrownew CacheValidationException("Pattern cannot be null or empty");
await _innerCache.RemoveByPatternAsync(pattern);
}
public CacheStatistics GetStatistics() => _innerCache.GetStatistics();
public void ClearStatistics() => _innerCache.ClearStatistics();
}
/// <summary>
/// 序列化器接口
/// </summary>
publicinterfaceICacheSerializer
{
byte[] Serialize<T>(T value);
T Deserialize<T>(byte[] data);
string SerializerName { get; }
bool SupportsType(Type type);
}
/// <summary>
/// JSON序列化器(默认)
/// </summary>
publicclassJsonCacheSerializer : ICacheSerializer
{
privatereadonly JsonSerializerOptions _options;
publicstring SerializerName => "JSON";
public JsonCacheSerializer()
{
_options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNameCaseInsensitive = true
};
}
publicbyte[] Serialize<T>(T value)
{
if (value == null) returnnull;
if (typeof(T) == typeof(string)) return System.Text.Encoding.UTF8.GetBytes(value.ToString());
var json = JsonSerializer.Serialize(value, _options);
return System.Text.Encoding.UTF8.GetBytes(json);
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length == 0) returndefault(T);
if (typeof(T) == typeof(string)) return (T)(object)System.Text.Encoding.UTF8.GetString(data);
var json = System.Text.Encoding.UTF8.GetString(data);
return JsonSerializer.Deserialize<T>(json, _options);
}
public bool SupportsType(Type type)
{
returntrue; // JSON支持所有类型
}
}
/// <summary>
/// 二进制序列化器(用于简单类型)
/// </summary>
publicclassBinaryCacheSerializer : ICacheSerializer
{
publicstring SerializerName => "Binary";
privatestaticreadonly HashSet<Type> SupportedTypes = new()
{
typeof(int), typeof(long), typeof(double), typeof(float),
typeof(bool), typeof(byte), typeof(short),
typeof(DateTime), typeof(DateTimeOffset), typeof(TimeSpan),
typeof(Guid), typeof(decimal)
};
publicbyte[] Serialize<T>(T value)
{
if (value == null) returnnull;
var type = typeof(T);
// 专门处理常见类型,提高性能
return type switch
{
_ when type == typeof(int) => BitConverter.GetBytes((int)(object)value),
_ when type == typeof(long) => BitConverter.GetBytes((long)(object)value),
_ when type == typeof(double) => BitConverter.GetBytes((double)(object)value),
_ when type == typeof(float) => BitConverter.GetBytes((float)(object)value),
_ when type == typeof(bool) => BitConverter.GetBytes((bool)(object)value),
_ when type == typeof(DateTime) => BitConverter.GetBytes(((DateTime)(object)value).ToBinary()),
_ when type == typeof(Guid) => ((Guid)(object)value).ToByteArray(),
_ when type == typeof(string) => System.Text.Encoding.UTF8.GetBytes(value.ToString()),
_ => thrownew NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
};
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length == 0) returndefault(T);
var type = typeof(T);
object result = type switch
{
_ when type == typeof(int) => BitConverter.ToInt32(data, 0),
_ when type == typeof(long) => BitConverter.ToInt64(data, 0),
_ when type == typeof(double) => BitConverter.ToDouble(data, 0),
_ when type == typeof(float) => BitConverter.ToSingle(data, 0),
_ when type == typeof(bool) => BitConverter.ToBoolean(data, 0),
_ when type == typeof(DateTime) => DateTime.FromBinary(BitConverter.ToInt64(data, 0)),
_ when type == typeof(Guid) => new Guid(data),
_ when type == typeof(string) => System.Text.Encoding.UTF8.GetString(data),
_ => thrownew NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
};
return (T)result;
}
public bool SupportsType(Type type)
{
return SupportedTypes.Contains(type) || type == typeof(string);
}
}
/// <summary>
/// 智能序列化器管理器
/// </summary>
publicclassSmartCacheSerializer : ICacheSerializer
{
privatereadonly ICacheSerializer[] _serializers;
privatereadonly ILogger<SmartCacheSerializer> _logger;
publicstring SerializerName => "Smart";
public SmartCacheSerializer(ILogger<SmartCacheSerializer> logger)
{
_logger = logger;
_serializers = new ICacheSerializer[]
{
new BinaryCacheSerializer(), // 优先使用二进制序列化
new JsonCacheSerializer() // 备选JSON序列化
};
}
publicbyte[] Serialize<T>(T value)
{
if (value == null) returnnull;
var type = typeof(T);
foreach (var serializer in _serializers)
{
if (serializer.SupportsType(type))
{
try
{
var data = serializer.Serialize(value);
// 在数据开头添加序列化器标识
var header = System.Text.Encoding.UTF8.GetBytes(serializer.SerializerName.PadRight(8));
var result = newbyte[header.Length + data.Length];
Array.Copy(header, 0, result, 0, header.Length);
Array.Copy(data, 0, result, header.Length, data.Length);
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Serializer {SerializerName} failed for type {TypeName}",
serializer.SerializerName, type.Name);
continue;
}
}
}
thrownew CacheSerializationException($"No suitable serializer found for type: {type.Name}");
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length < 8) returndefault(T);
// 读取序列化器标识
var headerBytes = newbyte[8];
Array.Copy(data, 0, headerBytes, 0, 8);
var serializerName = System.Text.Encoding.UTF8.GetString(headerBytes).Trim();
// 获取实际数据
var actualData = newbyte[data.Length - 8];
Array.Copy(data, 8, actualData, 0, actualData.Length);
// 找到对应的序列化器
var serializer = _serializers.FirstOrDefault(s => s.SerializerName == serializerName);
if (serializer == null)
{
_logger.LogWarning("Unknown serializer: {SerializerName}, falling back to JSON", serializerName);
serializer = new JsonCacheSerializer();
}
try
{
return serializer.Deserialize<T>(actualData);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to deserialize with {SerializerName}", serializerName);
thrownew CacheSerializationException($"Deserialization failed with {serializerName}", ex);
}
}
public bool SupportsType(Type type)
{
return _serializers.Any(s => s.SupportsType(type));
}
}
/// <summary>
/// 断路器状态
/// </summary>
publicenum CircuitBreakerState
{
Closed, // 正常状态
Open, // 断路器打开,拒绝请求
HalfOpen // 半开状态,允许少量请求通过
}
/// <summary>
/// 缓存断路器配置
/// </summary>
publicclassCacheCircuitBreakerOptions
{
publicint FailureThreshold { get; set; } = 5; // 连续失败阈值
public TimeSpan OpenTimeout { get; set; } = TimeSpan.FromMinutes(1); // 断路器打开时间
publicint SuccessThreshold { get; set; } = 2; // 半开状态成功阈值
public TimeSpan SamplingDuration { get; set; } = TimeSpan.FromMinutes(2); // 采样时间窗口
}
/// <summary>
/// 缓存断路器
/// </summary>
publicclassCacheCircuitBreaker
{
privatereadonly CacheCircuitBreakerOptions _options;
privatereadonly ILogger<CacheCircuitBreaker> _logger;
privatereadonlyobject _lock = newobject();
private CircuitBreakerState _state = CircuitBreakerState.Closed;
privateint _failureCount = 0;
privateint _successCount = 0;
private DateTime _lastFailureTime = DateTime.MinValue;
private DateTime _lastStateChangeTime = DateTime.UtcNow;
public CacheCircuitBreaker(
CacheCircuitBreakerOptions options,
ILogger<CacheCircuitBreaker> logger)
{
_options = options ?? thrownew ArgumentNullException(nameof(options));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
}
public CircuitBreakerState State => _state;
/// <summary>
/// 执行带断路器保护的操作
/// </summary>
publicasync Task<T> ExecuteAsync<T>(Func<Task<T>> operation, string operationName = null)
{
if (!CanExecute())
{
thrownew CacheException($"Circuit breaker is OPEN for operation: {operationName}");
}
try
{
var result = await operation();
OnSuccess();
return result;
}
catch (Exception ex)
{
OnFailure(ex, operationName);
throw;
}
}
/// <summary>
/// 检查是否可以执行操作
/// </summary>
private bool CanExecute()
{
lock (_lock)
{
switch (_state)
{
case CircuitBreakerState.Closed:
returntrue;
case CircuitBreakerState.Open:
// 检查是否可以转入半开状态
if (DateTime.UtcNow - _lastStateChangeTime >= _options.OpenTimeout)
{
_state = CircuitBreakerState.HalfOpen;
_successCount = 0;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogInformation("Circuit breaker entering HALF_OPEN state");
returntrue;
}
returnfalse;
case CircuitBreakerState.HalfOpen:
returntrue;
default:
returnfalse;
}
}
}
/// <summary>
/// 操作成功回调
/// </summary>
private void OnSuccess()
{
lock (_lock)
{
if (_state == CircuitBreakerState.HalfOpen)
{
_successCount++;
if (_successCount >= _options.SuccessThreshold)
{
_state = CircuitBreakerState.Closed;
_failureCount = 0;
_successCount = 0;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogInformation("Circuit breaker entering CLOSED state");
}
}
elseif (_state == CircuitBreakerState.Closed)
{
// 在采样时间窗口内重置失败计数
if (DateTime.UtcNow - _lastFailureTime > _options.SamplingDuration)
{
_failureCount = 0;
}
}
}
}
/// <summary>
/// 操作失败回调
/// </summary>
private void OnFailure(Exception ex, string operationName)
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
_logger.LogWarning(ex, "Circuit breaker recorded failure #{FailureCount} for operation: {Operation}",
_failureCount, operationName);
if (_state == CircuitBreakerState.Closed && _failureCount >= _options.FailureThreshold)
{
_state = CircuitBreakerState.Open;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogError("Circuit breaker entering OPEN state after {FailureCount} failures", _failureCount);
}
elseif (_state == CircuitBreakerState.HalfOpen)
{
_state = CircuitBreakerState.Open;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogWarning("Circuit breaker returning to OPEN state from HALF_OPEN due to failure");
}
}
}
/// <summary>
/// 获取当前状态信息
/// </summary>
public object GetState()
{
lock (_lock)
{
returnnew
{
State = _state.ToString(),
FailureCount = _failureCount,
SuccessCount = _successCount,
LastFailureTime = _lastFailureTime,
LastStateChangeTime = _lastStateChangeTime,
CanExecute = CanExecute()
};
}
}
}
/// <summary>
/// 带断路器的Redis缓存装饰器
/// </summary>
publicclassCircuitBreakerRedisCache : IRedisDistributedCache
{
privatereadonly IRedisDistributedCache _innerCache;
privatereadonly CacheCircuitBreaker _circuitBreaker;
privatereadonly ILogger<CircuitBreakerRedisCache> _logger;
public CircuitBreakerRedisCache(
IRedisDistributedCache innerCache,
CacheCircuitBreaker circuitBreaker,
ILogger<CircuitBreakerRedisCache> logger)
{
_innerCache = innerCache ?? thrownew ArgumentNullException(nameof(innerCache));
_circuitBreaker = circuitBreaker ?? thrownew ArgumentNullException(nameof(circuitBreaker));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
}
publicasync Task<T> GetAsync<T>(string key)
{
try
{
returnawait _circuitBreaker.ExecuteAsync(() => _innerCache.GetAsync<T>(key), $"GET:{key}");
}
catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
{
_logger.LogWarning("Circuit breaker open, returning default for key: {Key}", key);
returndefault(T);
}
}
publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
try
{
await _circuitBreaker.ExecuteAsync(() => _innerCache.SetAsync(key, value, expiry), $"SET:{key}");
}
catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
{
_logger.LogWarning("Circuit breaker open, skipping cache set for key: {Key}", key);
// 不继续抛出异常,允许应用继续运行
}
}
// 继续实现其他接口方法...
public Task<bool> ExistsAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.ExistsAsync(key), $"EXISTS:{key}");
public Task<bool> RemoveAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.RemoveAsync(key), $"REMOVE:{key}");
public Task<long> RemoveByPatternAsync(string pattern) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.RemoveByPatternAsync(pattern), $"REMOVE_PATTERN:{pattern}");
public Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetOrSetAsync(key, factory, expiry), $"GET_OR_SET:{key}");
public Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetMultipleAsync<T>(keys), "GET_MULTIPLE");
public Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.SetMultipleAsync(keyValuePairs, expiry), "SET_MULTIPLE");
public Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT:{key}");
public Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT_DOUBLE:{key}");
public Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.SetIfNotExistsAsync(key, value, expiry), $"SET_IF_NOT_EXISTS:{key}");
public Task<TimeSpan?> GetExpiryAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetExpiryAsync(key), $"GET_EXPIRY:{key}");
public Task<bool> ExpireAsync(string key, TimeSpan expiry) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.ExpireAsync(key, expiry), $"EXPIRE:{key}");
}
/// <summary>
/// LRU缓存容器,用于防止内存泄漏
/// </summary>
publicclassLRUCache<TKey, TValue>
{
privatereadonlyint _maxSize;
privatereadonly Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>> _cache;
privatereadonly LinkedList<CacheItem<TKey, TValue>> _lruList;
privatereadonlyobject _lock = newobject();
public LRUCache(int maxSize)
{
if (maxSize <= 0)
thrownew ArgumentException("Max size must be greater than 0", nameof(maxSize));
_maxSize = maxSize;
_cache = new Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>>(maxSize);
_lruList = new LinkedList<CacheItem<TKey, TValue>>();
}
publicint Count
{
get
{
lock (_lock)
{
return _cache.Count;
}
}
}
public bool TryGet(TKey key, out TValue value)
{
lock (_lock)
{
if (_cache.TryGetValue(key, outvar node))
{
// 移到链表头部(最近使用)
_lruList.Remove(node);
_lruList.AddFirst(node);
value = node.Value.Value;
returntrue;
}
value = default(TValue);
returnfalse;
}
}
public void Add(TKey key, TValue value)
{
lock (_lock)
{
if (_cache.TryGetValue(key, outvar existingNode))
{
// 更新已存在的项
existingNode.Value.Value = value;
existingNode.Value.LastAccessed = DateTime.UtcNow;
// 移到链表头部
_lruList.Remove(existingNode);
_lruList.AddFirst(existingNode);
}
else
{
// 检查容量限制
if (_cache.Count >= _maxSize)
{
// 移除最久未使用的项
var lastNode = _lruList.Last;
if (lastNode != null)
{
_cache.Remove(lastNode.Value.Key);
_lruList.RemoveLast();
}
}
// 添加新项
var newItem = new CacheItem<TKey, TValue>
{
Key = key,
Value = value,
LastAccessed = DateTime.UtcNow
};
var newNode = _lruList.AddFirst(newItem);
_cache[key] = newNode;
}
}
}
public bool Remove(TKey key)
{
lock (_lock)
{
if (_cache.TryGetValue(key, outvar node))
{
_cache.Remove(key);
_lruList.Remove(node);
returntrue;
}
returnfalse;
}
}
public void Clear()
{
lock (_lock)
{
_cache.Clear();
_lruList.Clear();
}
}
public IEnumerable<TKey> Keys
{
get
{
lock (_lock)
{
return _cache.Keys.ToList();
}
}
}
/// <summary>
/// 清理过期项
/// </summary>
public int CleanupExpired(TimeSpan maxAge)
{
var cutoffTime = DateTime.UtcNow - maxAge;
var expiredKeys = new List<TKey>();
lock (_lock)
{
foreach (var item in _lruList)
{
if (item.LastAccessed < cutoffTime)
{
expiredKeys.Add(item.Key);
}
}
foreach (var key in expiredKeys)
{
Remove(key);
}
}
return expiredKeys.Count;
}
}
/// <summary>
/// LRU缓存项
/// </summary>
classCacheItem<TKey, TValue>
{
public TKey Key { get; set; }
public TValue Value { get; set; }
public DateTime LastAccessed { get; set; }
}
/// <summary>
/// 高级内存缓存管理器
/// 提供泛型支持、统计信息、性能监控等功能
/// </summary>
publicinterfaceIAdvancedMemoryCache
{
Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
Task<T> GetAsync<T>(string key);
Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
Task RemoveAsync(string key);
Task RemoveByPatternAsync(string pattern);
CacheStatistics GetStatistics();
void ClearStatistics();
}
/// <summary>
/// 缓存统计信息
/// </summary>
publicclassCacheStatistics
{
publiclong HitCount { get; set; }
publiclong MissCount { get; set; }
publiclong SetCount { get; set; }
publiclong RemoveCount { get; set; }
publicdouble HitRatio => HitCount + MissCount == 0 ? 0 : (double)HitCount / (HitCount + MissCount);
public DateTime StartTime { get; set; }
public TimeSpan Duration => DateTime.UtcNow - StartTime;
}
/// <summary>
/// 缓存配置选项
/// </summary>
publicclassAdvancedMemoryCacheOptions
{
publicint SizeLimit { get; set; } = 1000;
public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromMinutes(30);
publicbool EnableStatistics { get; set; } = true;
publicbool EnablePatternRemoval { get; set; } = true;
publicdouble CompactionPercentage { get; set; } = 0.1;
}
/// <summary>
/// 高级内存缓存实现
/// 基于IMemoryCache构建的功能增强版本
/// </summary>
publicclassAdvancedMemoryCache : IAdvancedMemoryCache, IDisposable
{
privatereadonly IMemoryCache _cache;
privatereadonly ILogger<AdvancedMemoryCache> _logger;
privatereadonly AdvancedMemoryCacheOptions _options;
privatereadonly CacheStatistics _statistics;
privatereadonly ConcurrentDictionary<string, byte> _keyTracker;
privatereadonly SemaphoreSlim _semaphore;
privatereadonly Timer _cleanupTimer;
public AdvancedMemoryCache(
IMemoryCache cache,
ILogger<AdvancedMemoryCache> logger,
IOptions<AdvancedMemoryCacheOptions> options)
{
_cache = cache ?? thrownew ArgumentNullException(nameof(cache));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
_options = options?.Value ?? new AdvancedMemoryCacheOptions();
_statistics = new CacheStatistics { StartTime = DateTime.UtcNow };
_keyTracker = new ConcurrentDictionary<string, byte>();
_semaphore = new SemaphoreSlim(1, 1);
// 定期清理过期的key追踪记录
_cleanupTimer = new Timer(CleanupKeyTracker, null,
TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
}
/// <summary>
/// 获取或设置缓存项
/// 这是最常用的方法,实现了Cache-Aside模式
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <param name="factory">数据工厂方法</param>
/// <param name="expiry">过期时间</param>
/// <returns>缓存的值</returns>
publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
if (factory == null)
thrownew ArgumentNullException(nameof(factory));
// 尝试从缓存获取
var cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
_logger.LogDebug("Cache hit for key: {Key}", key);
return cachedValue;
}
// 使用信号量防止并发执行相同的factory方法
await _semaphore.WaitAsync();
try
{
// 双重检查锁定模式
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
_logger.LogDebug("Cache hit on second check for key: {Key}", key);
return cachedValue;
}
// 执行工厂方法获取数据
_logger.LogDebug("Cache miss for key: {Key}, executing factory method", key);
varvalue = await factory();
// 将结果存入缓存
await SetAsync(key, value, expiry);
returnvalue;
}
catch (CacheConnectionException ex)
{
_logger.LogWarning(ex, "Cache connection failed for key: {Key}, using fallback", key);
// 缓存连接失败时,仍执行工厂方法但不缓存结果
returnawait factory();
}
catch (CacheSerializationException ex)
{
_logger.LogError(ex, "Serialization failed for key: {Key}", key);
throw;
}
catch (CacheTimeoutException ex)
{
_logger.LogWarning(ex, "Cache operation timeout for key: {Key}", key);
returnawait factory();
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error occurred while executing factory method for key: {Key}", key);
thrownew CacheException($"Cache operation failed for key: {key}", ex);
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// 异步获取缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <returns>缓存的值,如果不存在则返回默认值</returns>
public Task<T> GetAsync<T>(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
var found = _cache.TryGetValue(key, outvarvalue);
if (_options.EnableStatistics)
{
if (found)
Interlocked.Increment(ref _statistics.HitCount);
else
Interlocked.Increment(ref _statistics.MissCount);
}
if (found && valueis T typedValue)
{
return Task.FromResult(typedValue);
}
return Task.FromResult(default(T));
}
/// <summary>
/// 异步设置缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <param name="value">缓存值</param>
/// <param name="expiry">过期时间</param>
public Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
var cacheExpiry = expiry ?? _options.DefaultExpiry;
usingvar entry = _cache.CreateEntry(key);
entry.Value = value;
entry.AbsoluteExpirationRelativeToNow = cacheExpiry;
entry.Size = 1; // 简化的大小计算,实际应用中可根据对象大小设置
// 设置过期回调
entry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
{
EvictionCallback = OnCacheEntryEvicted,
State = key
});
// 追踪缓存键
if (_options.EnablePatternRemoval)
{
_keyTracker.TryAdd(key, 0);
}
if (_options.EnableStatistics)
{
Interlocked.Increment(ref _statistics.SetCount);
}
_logger.LogDebug("Set cache entry for key: {Key}, expiry: {Expiry}", key, cacheExpiry);
return Task.CompletedTask;
}
/// <summary>
/// 异步移除缓存项
/// </summary>
/// <param name="key">缓存键</param>
public Task RemoveAsync(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
_cache.Remove(key);
_keyTracker.TryRemove(key, out _);
if (_options.EnableStatistics)
{
Interlocked.Increment(ref _statistics.RemoveCount);
}
_logger.LogDebug("Removed cache entry for key: {Key}", key);
return Task.CompletedTask;
}
/// <summary>
/// 根据模式异步移除缓存项
/// 支持通配符匹配,如 "user:*", "*:settings"
/// </summary>
/// <param name="pattern">匹配模式</param>
public async Task RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrEmpty(pattern))
thrownew ArgumentException("Pattern cannot be null or empty", nameof(pattern));
if (!_options.EnablePatternRemoval)
{
_logger.LogWarning("Pattern removal is disabled");
return;
}
var keysToRemove = new List<string>();
var regexPattern = ConvertWildcardToRegex(pattern);
var regex = new System.Text.RegularExpressions.Regex(regexPattern,
System.Text.RegularExpressions.RegexOptions.IgnoreCase);
foreach (var key in _keyTracker.Keys)
{
if (regex.IsMatch(key))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
await RemoveAsync(key);
}
_logger.LogInformation("Removed {Count} cache entries matching pattern: {Pattern}",
keysToRemove.Count, pattern);
}
/// <summary>
/// 获取缓存统计信息
/// </summary>
/// <returns>统计信息对象</returns>
public CacheStatistics GetStatistics()
{
if (!_options.EnableStatistics)
{
returnnew CacheStatistics();
}
returnnew CacheStatistics
{
HitCount = _statistics.HitCount,
MissCount = _statistics.MissCount,
SetCount = _statistics.SetCount,
RemoveCount = _statistics.RemoveCount,
StartTime = _statistics.StartTime
};
}
/// <summary>
/// 清除统计信息
/// </summary>
public void ClearStatistics()
{
if (_options.EnableStatistics)
{
Interlocked.Exchange(ref _statistics.HitCount, 0);
Interlocked.Exchange(ref _statistics.MissCount, 0);
Interlocked.Exchange(ref _statistics.SetCount, 0);
Interlocked.Exchange(ref _statistics.RemoveCount, 0);
_statistics.StartTime = DateTime.UtcNow;
}
}
/// <summary>
/// 缓存项被驱逐时的回调方法
/// </summary>
/// <param name="key">缓存键</param>
/// <param name="value">缓存值</param>
/// <param name="reason">驱逐原因</param>
/// <param name="state">状态对象</param>
private void OnCacheEntryEvicted(object key, object value, EvictionReason reason, object state)
{
var cacheKey = state?.ToString();
if (!string.IsNullOrEmpty(cacheKey))
{
_keyTracker.TryRemove(cacheKey, out _);
}
_logger.LogDebug("Cache entry evicted - Key: {Key}, Reason: {Reason}", key, reason);
}
/// <summary>
/// 将通配符模式转换为正则表达式
/// </summary>
/// <param name="wildcardPattern">通配符模式</param>
/// <returns>正则表达式字符串</returns>
private static string ConvertWildcardToRegex(string wildcardPattern)
{
return"^" + System.Text.RegularExpressions.Regex.Escape(wildcardPattern)
.Replace("\\*", ".*")
.Replace("\\?", ".") + "$";
}
/// <summary>
/// 定期清理key追踪器中的过期项
/// </summary>
/// <param name="state">定时器状态</param>
private void CleanupKeyTracker(object state)
{
var keysToRemove = new List<string>();
foreach (var key in _keyTracker.Keys)
{
if (!_cache.TryGetValue(key, out _))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
_keyTracker.TryRemove(key, out _);
}
if (keysToRemove.Count > 0)
{
_logger.LogDebug("Cleaned up {Count} expired keys from tracker", keysToRemove.Count);
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
_cleanupTimer?.Dispose();
_semaphore?.Dispose();
_cache?.Dispose();
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
/// <summary>
/// 内存缓存服务扩展
/// </summary>
publicstaticclassMemoryCacheServiceExtensions
{
/// <summary>
/// 添加高级内存缓存服务
/// </summary>
/// <param name="services">服务集合</param>
/// <param name="setupAction">配置委托</param>
/// <returns>服务集合</returns>
public static IServiceCollection AddAdvancedMemoryCache(
this IServiceCollection services,
Action<AdvancedMemoryCacheOptions> setupAction = null)
{
// 添加基础内存缓存
services.AddMemoryCache(options =>
{
options.SizeLimit = 1000; // 设置缓存大小限制
options.CompactionPercentage = 0.1; // 内存压力时的压缩百分比
options.ExpirationScanFrequency = TimeSpan.FromMinutes(1); // 过期扫描频率
});
// 配置选项
if (setupAction != null)
{
services.Configure(setupAction);
}
else
{
services.Configure<AdvancedMemoryCacheOptions>(options =>
{
// 默认配置
options.SizeLimit = 1000;
options.DefaultExpiry = TimeSpan.FromMinutes(30);
options.EnableStatistics = true;
options.EnablePatternRemoval = true;
options.CompactionPercentage = 0.1;
});
}
// 注册高级内存缓存服务(带安全装饰器)
services.AddSingleton<AdvancedMemoryCache>();
services.AddSingleton<IAdvancedMemoryCache>(provider =>
{
var innerCache = provider.GetRequiredService<AdvancedMemoryCache>();
var validator = provider.GetRequiredService<ICacheDataValidator>();
var logger = provider.GetRequiredService<ILogger<SecureCacheManagerDecorator>>();
returnnew SecureCacheManagerDecorator(innerCache, validator, logger);
});
return services;
}
}
/// <summary>
/// 示例:在Program.cs中的配置
/// </summary>
publicclassProgram
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// 添加增强组件
builder.Services.AddSingleton<ICacheDataValidator, DefaultCacheDataValidator>();
builder.Services.AddSingleton<ICacheSerializer, SmartCacheSerializer>();
// 添加断路器配置
builder.Services.Configure<CacheCircuitBreakerOptions>(options =>
{
options.FailureThreshold = 5;
options.OpenTimeout = TimeSpan.FromMinutes(1);
options.SuccessThreshold = 2;
});
builder.Services.AddSingleton<CacheCircuitBreaker>();
// 添加高级内存缓存(带安全验证)
builder.Services.AddAdvancedMemoryCache(options =>
{
options.SizeLimit = 2000;
options.DefaultExpiry = TimeSpan.FromHours(1);
options.EnableStatistics = true;
options.EnablePatternRemoval = true;
options.CompactionPercentage = 0.15;
});
var app = builder.Build();
app.Run();
}
}
using StackExchange.Redis;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using System.Text.Json;
/// <summary>
/// Redis缓存配置选项
/// </summary>
publicclassRedisCacheOptions
{
publicstring ConnectionString { get; set; } = "localhost:6379";
publicint Database { get; set; } = 0;
publicstring KeyPrefix { get; set; } = "app:";
public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromHours(1);
publicint ConnectTimeout { get; set; } = 5000;
publicint SyncTimeout { get; set; } = 1000;
publicbool AllowAdmin { get; set; } = false;
publicstring Password { get; set; }
publicbool Ssl { get; set; } = false;
publicint ConnectRetry { get; set; } = 3;
publicbool AbortOnConnectFail { get; set; } = false;
publicstring ClientName { get; set; } = "MultiLevelCache";
}
/// <summary>
/// Redis连接管理器
/// 提供连接池管理和故障恢复功能
/// </summary>
publicinterfaceIRedisConnectionManager : IDisposable
{
IDatabase GetDatabase();
ISubscriber GetSubscriber();
IServer GetServer();
bool IsConnected { get; }
Task<bool> TestConnectionAsync();
}
/// <summary>
/// Redis连接管理器实现
/// </summary>
publicclassRedisConnectionManager : IRedisConnectionManager
{
privatereadonly RedisCacheOptions _options;
privatereadonly ILogger<RedisConnectionManager> _logger;
privatereadonly Lazy<ConnectionMultiplexer> _connectionMultiplexer;
privatebool _disposed = false;
public RedisConnectionManager(
IOptions<RedisCacheOptions> options,
ILogger<RedisConnectionManager> logger)
{
_options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
_connectionMultiplexer = new Lazy<ConnectionMultiplexer>(CreateConnection);
}
/// <summary>
/// 创建Redis连接
/// </summary>
/// <returns>ConnectionMultiplexer实例</returns>
private ConnectionMultiplexer CreateConnection()
{
var configurationOptions = new ConfigurationOptions
{
EndPoints = { _options.ConnectionString },
ConnectTimeout = _options.ConnectTimeout,
SyncTimeout = _options.SyncTimeout,
AllowAdmin = _options.AllowAdmin,
ConnectRetry = _options.ConnectRetry,
AbortOnConnectFail = _options.AbortOnConnectFail,
ClientName = _options.ClientName,
Ssl = _options.Ssl
};
if (!string.IsNullOrEmpty(_options.Password))
{
configurationOptions.Password = _options.Password;
}
try
{
var connection = ConnectionMultiplexer.Connect(configurationOptions);
// 注册连接事件
connection.ConnectionFailed += OnConnectionFailed;
connection.ConnectionRestored += OnConnectionRestored;
connection.ErrorMessage += OnErrorMessage;
connection.InternalError += OnInternalError;
_logger.LogInformation("Redis connection established successfully");
return connection;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to establish Redis connection");
throw;
}
}
/// <summary>
/// 获取数据库实例
/// </summary>
/// <returns>IDatabase实例</returns>
public IDatabase GetDatabase()
{
return _connectionMultiplexer.Value.GetDatabase(_options.Database);
}
/// <summary>
/// 获取订阅者实例
/// </summary>
/// <returns>ISubscriber实例</returns>
public ISubscriber GetSubscriber()
{
return _connectionMultiplexer.Value.GetSubscriber();
}
/// <summary>
/// 获取服务器实例
/// </summary>
/// <returns>IServer实例</returns>
public IServer GetServer()
{
var endpoints = _connectionMultiplexer.Value.GetEndPoints();
return _connectionMultiplexer.Value.GetServer(endpoints.First());
}
/// <summary>
/// 检查连接状态
/// </summary>
publicbool IsConnected => _connectionMultiplexer.IsValueCreated &&
_connectionMultiplexer.Value.IsConnected;
/// <summary>
/// 测试连接
/// </summary>
/// <returns>连接是否成功</returns>
public async Task<bool> TestConnectionAsync()
{
try
{
var database = GetDatabase();
await database.PingAsync();
returntrue;
}
catch (Exception ex)
{
_logger.LogError(ex, "Redis connection test failed");
returnfalse;
}
}
#region 事件处理
private void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
_logger.LogError(e.Exception, "Redis connection failed: {EndPoint}", e.EndPoint);
}
private void OnConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
_logger.LogInformation("Redis connection restored: {EndPoint}", e.EndPoint);
}
private void OnErrorMessage(object sender, RedisErrorEventArgs e)
{
_logger.LogError("Redis error: {Message}", e.Message);
}
private void OnInternalError(object sender, InternalErrorEventArgs e)
{
_logger.LogError(e.Exception, "Redis internal error");
}
#endregion
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (!_disposed)
{
if (_connectionMultiplexer.IsValueCreated)
{
_connectionMultiplexer.Value.Close();
_connectionMultiplexer.Value.Dispose();
}
_disposed = true;
}
}
}
using StackExchange.Redis;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
/// <summary>
/// Redis分布式缓存接口
/// </summary>
publicinterfaceIRedisDistributedCache
{
Task<T> GetAsync<T>(string key);
Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
Task<bool> ExistsAsync(string key);
Task<bool> RemoveAsync(string key);
Task<long> RemoveByPatternAsync(string pattern);
Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys);
Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null);
Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null);
Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null);
Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null);
Task<TimeSpan?> GetExpiryAsync(string key);
Task<bool> ExpireAsync(string key, TimeSpan expiry);
}
/// <summary>
/// Redis分布式缓存实现
/// </summary>
publicclassRedisDistributedCache : IRedisDistributedCache
{
privatereadonly IRedisConnectionManager _connectionManager;
privatereadonly RedisCacheOptions _options;
privatereadonly ILogger<RedisDistributedCache> _logger;
privatereadonly ICacheSerializer _serializer;
public RedisDistributedCache(
IRedisConnectionManager connectionManager,
IOptions<RedisCacheOptions> options,
ILogger<RedisDistributedCache> logger)
{
_connectionManager = connectionManager ?? thrownew ArgumentNullException(nameof(connectionManager));
_options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
// 配置JSON序列化选项
// 使用智能序列化器替代直接的JSON序列化器
_serializer = serviceProvider?.GetService<ICacheSerializer>() ?? new JsonCacheSerializer();
}
/// <summary>
/// 异步获取缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <returns>缓存的值</returns>
publicasync Task<T> GetAsync<T>(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
varvalue = await database.StringGetAsync(fullKey);
if (!value.HasValue)
{
_logger.LogDebug("Cache miss for key: {Key}", key);
returndefault(T);
}
_logger.LogDebug("Cache hit for key: {Key}", key);
return DeserializeValue<T>(value);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting value from Redis for key: {Key}", key);
returndefault(T);
}
}
/// <summary>
/// 异步设置缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <param name="value">缓存值</param>
/// <param name="expiry">过期时间</param>
publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var serializedValue = SerializeValue(value);
var expiration = expiry ?? _options.DefaultExpiry;
await database.StringSetAsync(fullKey, serializedValue, expiration);
_logger.LogDebug("Set cache value for key: {Key}, expiry: {Expiry}", key, expiration);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 检查键是否存在
/// </summary>
/// <param name="key">缓存键</param>
/// <returns>键是否存在</returns>
public async Task<bool> ExistsAsync(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
returnawait database.KeyExistsAsync(fullKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking existence in Redis for key: {Key}", key);
returnfalse;
}
}
/// <summary>
/// 异步移除缓存项
/// </summary>
/// <param name="key">缓存键</param>
/// <returns>是否成功移除</returns>
public async Task<bool> RemoveAsync(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.KeyDeleteAsync(fullKey);
_logger.LogDebug("Remove cache key: {Key}, success: {Success}", key, result);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing key from Redis: {Key}", key);
returnfalse;
}
}
/// <summary>
/// 根据模式批量删除缓存项
/// </summary>
/// <param name="pattern">匹配模式</param>
/// <returns>删除的项目数量</returns>
public async Task<long> RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrEmpty(pattern))
thrownew ArgumentException("Pattern cannot be null or empty", nameof(pattern));
try
{
var server = _connectionManager.GetServer();
var database = _connectionManager.GetDatabase();
var fullPattern = GetFullKey(pattern);
var keys = server.Keys(database.Database, fullPattern).ToArray();
if (keys.Length == 0)
{
return0;
}
var deletedCount = await database.KeyDeleteAsync(keys);
_logger.LogInformation("Deleted {Count} keys matching pattern: {Pattern}", deletedCount, pattern);
return deletedCount;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing keys by pattern from Redis: {Pattern}", pattern);
return0;
}
}
/// <summary>
/// 获取或设置缓存项(分布式锁实现)
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <param name="factory">数据工厂方法</param>
/// <param name="expiry">过期时间</param>
/// <returns>缓存的值</returns>
publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
if (factory == null)
thrownew ArgumentNullException(nameof(factory));
// 尝试从缓存获取
var cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 使用分布式锁防止缓存击穿
var lockKey = $"{key}:lock";
var lockValue = Guid.NewGuid().ToString();
var database = _connectionManager.GetDatabase();
try
{
// 尝试获取分布式锁
var lockAcquired = await database.StringSetAsync(
GetFullKey(lockKey),
lockValue,
TimeSpan.FromMinutes(1),
When.NotExists);
if (lockAcquired)
{
try
{
// 再次检查缓存
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 执行工厂方法
_logger.LogDebug("Executing factory method for key: {Key}", key);
varvalue = await factory();
// 设置缓存
await SetAsync(key, value, expiry);
returnvalue;
}
finally
{
// 释放分布式锁(使用Lua脚本确保原子性)
conststring releaseLockScript = @"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end";
await database.ScriptEvaluateAsync(
releaseLockScript,
new RedisKey[] { GetFullKey(lockKey) },
new RedisValue[] { lockValue });
}
}
else
{
// 等待锁释放并重试
_logger.LogDebug("Waiting for lock to be released for key: {Key}", key);
await Task.Delay(50); // 短暂等待
// 重试获取缓存
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 如果仍未获取到,执行降级策略
_logger.LogWarning("Failed to acquire lock and cache miss for key: {Key}, executing factory method", key);
returnawait factory();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in GetOrSetAsync for key: {Key}", key);
// 降级到直接执行工厂方法
returnawait factory();
}
}
/// <summary>
/// 批量获取缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="keys">缓存键集合</param>
/// <returns>键值对字典</returns>
publicasync Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys)
{
if (keys == null)
thrownew ArgumentNullException(nameof(keys));
var keyList = keys.ToList();
if (!keyList.Any())
{
returnnew Dictionary<string, T>();
}
try
{
var database = _connectionManager.GetDatabase();
var fullKeys = keyList.Select(k => (RedisKey)GetFullKey(k)).ToArray();
var values = await database.StringGetAsync(fullKeys);
var result = new Dictionary<string, T>();
for (int i = 0; i < keyList.Count; i++)
{
if (values[i].HasValue)
{
result[keyList[i]] = DeserializeValue<T>(values[i]);
}
}
_logger.LogDebug("Retrieved {Count} out of {Total} keys from Redis",
result.Count, keyList.Count);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting multiple values from Redis");
returnnew Dictionary<string, T>();
}
}
/// <summary>
/// 批量设置缓存项
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="keyValuePairs">键值对字典</param>
/// <param name="expiry">过期时间</param>
publicasync Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null)
{
if (keyValuePairs == null || !keyValuePairs.Any())
return;
try
{
var database = _connectionManager.GetDatabase();
var expiration = expiry ?? _options.DefaultExpiry;
var tasks = keyValuePairs.Select(async kvp =>
{
var fullKey = GetFullKey(kvp.Key);
var serializedValue = SerializeValue(kvp.Value);
await database.StringSetAsync(fullKey, serializedValue, expiration);
});
await Task.WhenAll(tasks);
_logger.LogDebug("Set {Count} cache values with expiry: {Expiry}",
keyValuePairs.Count, expiration);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting multiple values in Redis");
throw;
}
}
/// <summary>
/// 原子递增操作
/// </summary>
/// <param name="key">缓存键</param>
/// <param name="value">递增值</param>
/// <param name="expiry">过期时间</param>
/// <returns>递增后的值</returns>
public async Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.StringIncrementAsync(fullKey, value);
if (expiry.HasValue)
{
await database.KeyExpireAsync(fullKey, expiry.Value);
}
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error incrementing value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 原子递增操作(浮点数)
/// </summary>
/// <param name="key">缓存键</param>
/// <param name="value">递增值</param>
/// <param name="expiry">过期时间</param>
/// <returns>递增后的值</returns>
public async Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.StringIncrementAsync(fullKey, value);
if (expiry.HasValue)
{
await database.KeyExpireAsync(fullKey, expiry.Value);
}
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error incrementing double value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 仅在键不存在时设置值
/// </summary>
/// <typeparam name="T">缓存项类型</typeparam>
/// <param name="key">缓存键</param>
/// <param name="value">缓存值</param>
/// <param name="expiry">过期时间</param>
/// <returns>是否设置成功</returns>
publicasync Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var serializedValue = SerializeValue(value);
var expiration = expiry ?? _options.DefaultExpiry;
var result = await database.StringSetAsync(fullKey, serializedValue, expiration, When.NotExists);
_logger.LogDebug("SetIfNotExists for key: {Key}, success: {Success}", key, result);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in SetIfNotExists for key: {Key}", key);
returnfalse;
}
}
/// <summary>
/// 获取键的过期时间
/// </summary>
/// <param name="key">缓存键</param>
/// <returns>过期时间,如果键不存在或无过期时间则返回null</returns>
publicasync Task<TimeSpan?> GetExpiryAsync(string key)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
returnawait database.KeyTimeToLiveAsync(fullKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting expiry for key: {Key}", key);
returnnull;
}
}
/// <summary>
/// 设置键的过期时间
/// </summary>
/// <param name="key">缓存键</param>
/// <param name="expiry">过期时间</param>
/// <returns>是否设置成功</returns>
public async Task<bool> ExpireAsync(string key, TimeSpan expiry)
{
if (string.IsNullOrEmpty(key))
thrownew ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
returnawait database.KeyExpireAsync(fullKey, expiry);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting expiry for key: {Key}", key);
returnfalse;
}
}
#region 辅助方法
/// <summary>
/// 获取完整的缓存键(带前缀)
/// </summary>
/// <param name="key">原始键</param>
/// <returns>完整键</returns>
private string GetFullKey(string key)
{
return$"{_options.KeyPrefix}{key}";
}
/// <summary>
/// 序列化值
/// </summary>
/// <typeparam name="T">值类型</typeparam>
/// <param name="value">要序列化的值</param>
/// <returns>序列化后的字符串</returns>
privatestring SerializeValue<T>(T value)
{
if (value == null) returnnull;
try
{
var serializedBytes = _serializer.Serialize(value);
return Convert.ToBase64String(serializedBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error serializing value of type: {Type}", typeof(T).Name);
thrownew CacheSerializationException($"Failed to serialize value of type: {typeof(T).Name}", ex);
}
}
/// <summary>
/// 反序列化值
/// </summary>
/// <typeparam name="T">目标类型</typeparam>
/// <param name="value">要反序列化的值</param>
/// <returns>反序列化后的对象</returns>
private T DeserializeValue<T>(stringvalue)
{
if (string.IsNullOrEmpty(value)) returndefault(T);
try
{
var serializedBytes = Convert.FromBase64String(value);
return _serializer.Deserialize<T>(serializedBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error deserializing value to type: {Type}", typeof(T).Name);
thrownew CacheSerializationException($"Failed to deserialize value to type: {typeof(T).Name}", ex);
}
}
#endregion
}
using System.Text.Json;
/// <summary>
/// 缓存同步事件类型
/// </summary>
publicenum CacheSyncEventType
{
/// <summary>
/// 缓存项被设置
/// </summary>
Set,
/// <summary>
/// 缓存项被删除
/// </summary>
Remove,
/// <summary>
/// 缓存项过期
/// </summary>
Expire,
/// <summary>
/// 批量删除(按模式)
/// </summary>
RemovePattern,
/// <summary>
/// 清空所有缓存
/// </summary>
Clear
}
/// <summary>
/// 缓存同步事件
/// </summary>
publicclassCacheSyncEvent
{
/// <summary>
/// 事件ID(用于幂等性控制)
/// </summary>
publicstring EventId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// 事件类型
/// </summary>
public CacheSyncEventType EventType { get; set; }
/// <summary>
/// 缓存键
/// </summary>
publicstring Key { get; set; }
/// <summary>
/// 模式(用于批量删除)
/// </summary>
publicstring Pattern { get; set; }
/// <summary>
/// 事件发生时间
/// </summary>
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
/// <summary>
/// 发起节点标识
/// </summary>
publicstring NodeId { get; set; }
/// <summary>
/// 附加数据
/// </summary>
public Dictionary<string, object> Metadata { get; set; } = new();
/// <summary>
/// 序列化为JSON
/// </summary>
/// <returns>JSON字符串</returns>
public string ToJson()
{
return JsonSerializer.Serialize(this, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
/// <summary>
/// 从JSON反序列化
/// </summary>
/// <param name="json">JSON字符串</param>
/// <returns>缓存同步事件</returns>
public static CacheSyncEvent FromJson(string json)
{
return JsonSerializer.Deserialize<CacheSyncEvent>(json, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
}
/// <summary>
/// 缓存同步配置选项
/// </summary>
publicclassCacheSyncOptions
{
/// <summary>
/// Redis发布订阅频道前缀
/// </summary>
publicstring ChannelPrefix { get; set; } = "cache_sync";
/// <summary>
/// 当前节点ID
/// </summary>
publicstring NodeId { get; set; } = Environment.MachineName;
/// <summary>
/// 是否启用缓存同步
/// </summary>
publicbool EnableSync { get; set; } = true;
/// <summary>
/// 事件去重窗口时间
/// </summary>
public TimeSpan DeduplicationWindow { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 最大重试次数
/// </summary>
publicint MaxRetryAttempts { get; set; } = 3;
/// <summary>
/// 重试延迟
/// </summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 批量处理的最大延迟
/// </summary>
public TimeSpan BatchMaxDelay { get; set; } = TimeSpan.FromMilliseconds(100);
/// <summary>
/// 批量处理的最大大小
/// </summary>
publicint BatchMaxSize { get; set; } = 50;
}
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Concurrent;
using System.Threading.Channels;
/// <summary>
/// 缓存同步服务接口
/// </summary>
publicinterfaceICacheSyncService
{
/// <summary>
/// 发布缓存同步事件
/// </summary>
/// <param name="syncEvent">同步事件</param>
Task PublishAsync(CacheSyncEvent syncEvent);
/// <summary>
/// 订阅缓存同步事件
/// </summary>
/// <param name="handler">事件处理器</param>
Task SubscribeAsync(Func<CacheSyncEvent, Task> handler);
/// <summary>
/// 取消订阅
/// </summary>
Task UnsubscribeAsync();
/// <summary>
/// 检查服务状态
/// </summary>
bool IsHealthy { get; }
}
/// <summary>
/// Redis发布-订阅缓存同步服务
/// </summary>
publicclassRedisCacheSyncService : ICacheSyncService, IHostedService, IDisposable
{
privatereadonly IRedisConnectionManager _connectionManager;
privatereadonly CacheSyncOptions _options;
privatereadonly ILogger<RedisCacheSyncService> _logger;
// 事件处理器集合
privatereadonly ConcurrentBag<Func<CacheSyncEvent, Task>> _handlers;
// 事件去重缓存
privatereadonly ConcurrentDictionary<string, DateTime> _processedEvents;
// 批量处理通道
privatereadonly Channel<CacheSyncEvent> _eventChannel;
privatereadonly ChannelWriter<CacheSyncEvent> _eventWriter;
privatereadonly ChannelReader<CacheSyncEvent> _eventReader;
// 订阅和处理任务
private Task _subscriptionTask;
private Task _processingTask;
private CancellationTokenSource _cancellationTokenSource;
// 服务状态
privatevolatilebool _isHealthy = false;
privatebool _disposed = false;
public RedisCacheSyncService(
IRedisConnectionManager connectionManager,
IOptions<CacheSyncOptions> options,
ILogger<RedisCacheSyncService> logger)
{
_connectionManager = connectionManager ?? thrownew ArgumentNullException(nameof(connectionManager));
_options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
_logger = logger ?? thrownew ArgumentNullException(nameof(logger));
_handlers = new ConcurrentBag<Func<CacheSyncEvent, Task>>();
_processedEvents = new ConcurrentDictionary<string, DateTime>();
// 创建有界通道用于批量处理
var channelOptions = new BoundedChannelOptions(_options.BatchMaxSize * 2)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_eventChannel = Channel.CreateBounded<CacheSyncEvent>(channelOptions);
_eventWriter = _eventChannel.Writer;
_eventReader = _eventChannel.Reader;
}
/// <summary>
/// 服务健康状态
/// </summary>
publicbool IsHealthy => _isHealthy;
/// <summary>
/// 发布缓存同步事件
/// </summary>
/// <param name="syncEvent">同步事件</param>
public async Task PublishAsync(CacheSyncEvent syncEvent)
{
if (!_options.EnableSync || syncEvent == null)
return;
try
{
// 设置节点ID
syncEvent.NodeId = _options.NodeId;
var subscriber = _connectionManager.GetSubscriber();
var channel = GetChannelName();
var message = syncEvent.ToJson();
await subscriber.PublishAsync(channel, message);
_logger.LogDebug("Published sync event: {EventType} for key: {Key}",
syncEvent.EventType, syncEvent.Key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish sync event: {EventType} for key: {Key}",
syncEvent.EventType, syncEvent.Key);
}
}
/// <summary>
/// 订阅缓存同步事件
/// </summary>
/// <param name="handler">事件处理器</param>
public Task SubscribeAsync(Func<CacheSyncEvent, Task> handler)
{
if (handler == null)
thrownew ArgumentNullException(nameof(handler));
_handlers.Add(handler);
_logger.LogDebug("Added sync event handler");
return Task.CompletedTask;
}
/// <summary>
/// 取消订阅
/// </summary>
public async Task UnsubscribeAsync()
{
try
{
if (_subscriptionTask != null && !_subscriptionTask.IsCompleted)
{
_cancellationTokenSource?.Cancel();
await _subscriptionTask;
}
_logger.LogDebug("Unsubscribed from sync events");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during unsubscribe");
}
}
/// <summary>
/// 启动服务
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
public Task StartAsync(CancellationToken cancellationToken)
{
if (!_options.EnableSync)
{
_logger.LogInformation("Cache sync is disabled");
return Task.CompletedTask;
}
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// 启动Redis订阅任务
_subscriptionTask = StartSubscriptionAsync(_cancellationTokenSource.Token);
// 启动事件处理任务
_processingTask = StartProcessingAsync(_cancellationTokenSource.Token);
// 启动清理任务
_ = Task.Run(() => StartCleanupAsync(_cancellationTokenSource.Token), cancellationToken);
_logger.LogInformation("Cache sync service started with NodeId: {NodeId}", _options.NodeId);
return Task.CompletedTask;
}
/// <summary>
/// 停止服务
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping cache sync service");
_cancellationTokenSource?.Cancel();
// 完成事件通道写入
_eventWriter.TryComplete();
try
{
// 等待订阅任务完成
if (_subscriptionTask != null)
{
await _subscriptionTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
}
// 等待处理任务完成
if (_processingTask != null)
{
await _processingTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
}
}
catch (TimeoutException)
{
_logger.LogWarning("Cache sync service stop timed out");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error stopping cache sync service");
}
_isHealthy = false;
_logger.LogInformation("Cache sync service stopped");
}
/// <summary>
/// 启动Redis订阅
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartSubscriptionAsync(CancellationToken cancellationToken)
{
var retryCount = 0;
while (!cancellationToken.IsCancellationRequested && retryCount < _options.MaxRetryAttempts)
{
try
{
var subscriber = _connectionManager.GetSubscriber();
var channel = GetChannelName();
await subscriber.SubscribeAsync(channel, OnMessageReceived);
_isHealthy = true;
_logger.LogInformation("Successfully subscribed to Redis channel: {Channel}", channel);
// 保持订阅状态
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
}
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
retryCount++;
_isHealthy = false;
_logger.LogError(ex, "Redis subscription failed, retry {RetryCount}/{MaxRetries}",
retryCount, _options.MaxRetryAttempts);
if (retryCount < _options.MaxRetryAttempts)
{
await Task.Delay(_options.RetryDelay, cancellationToken);
}
}
}
}
/// <summary>
/// 启动事件批量处理
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var eventBatch = new List<CacheSyncEvent>();
var batchTimer = Stopwatch.StartNew();
try
{
while (!cancellationToken.IsCancellationRequested)
{
// 尝试读取事件(带超时)
var hasEvent = await _eventReader.WaitToReadAsync(cancellationToken);
if (!hasEvent)
break;
// 收集批量事件
while (_eventReader.TryRead(outvar syncEvent) &&
eventBatch.Count < _options.BatchMaxSize)
{
eventBatch.Add(syncEvent);
}
// 检查是否应该处理批次
var shouldProcess = eventBatch.Count >= _options.BatchMaxSize ||
batchTimer.Elapsed >= _options.BatchMaxDelay ||
!_eventReader.TryPeek(out _); // 没有更多事件
if (shouldProcess && eventBatch.Count > 0)
{
await ProcessEventBatchAsync(eventBatch, cancellationToken);
eventBatch.Clear();
batchTimer.Restart();
}
elseif (eventBatch.Count > 0)
{
// 短暂等待以收集更多事件
await Task.Delay(10, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
// 正常取消
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in event processing loop");
}
finally
{
// 处理剩余的事件
if (eventBatch.Count > 0)
{
try
{
await ProcessEventBatchAsync(eventBatch, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing final event batch");
}
}
}
}
/// <summary>
/// 处理事件批次
/// </summary>
/// <param name="events">事件列表</param>
/// <param name="cancellationToken">取消令牌</param>
private async Task ProcessEventBatchAsync(List<CacheSyncEvent> events, CancellationToken cancellationToken)
{
if (!events.Any())
return;
try
{
var tasks = events
.Where(e => !IsEventProcessed(e.EventId))
.Select(async e =>
{
try
{
// 标记事件为已处理
MarkEventAsProcessed(e.EventId);
// 并行调用所有处理器
var handlerTasks = _handlers.Select(handler => handler(e));
await Task.WhenAll(handlerTasks);
_logger.LogDebug("Processed sync event: {EventType} for key: {Key}",
e.EventType, e.Key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing sync event: {EventId}", e.EventId);
}
});
await Task.WhenAll(tasks);
_logger.LogDebug("Processed batch of {Count} sync events", events.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event batch");
}
}
/// <summary>
/// 处理接收到的Redis消息
/// </summary>
/// <param name="channel">频道</param>
/// <param name="message">消息</param>
private async void OnMessageReceived(RedisChannel channel, RedisValue message)
{
try
{
if (!message.HasValue)
return;
var syncEvent = CacheSyncEvent.FromJson(message);
// 忽略自己发送的事件
if (syncEvent.NodeId == _options.NodeId)
{
_logger.LogTrace("Ignoring self-generated event: {EventId}", syncEvent.EventId);
return;
}
// 将事件加入处理队列
if (!await _eventWriter.WaitToWriteAsync())
{
_logger.LogWarning("Event channel is closed, dropping event: {EventId}", syncEvent.EventId);
return;
}
if (!_eventWriter.TryWrite(syncEvent))
{
_logger.LogWarning("Failed to queue sync event: {EventId}", syncEvent.EventId);
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize sync event message: {Message}", message.ToString());
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing received message");
}
}
/// <summary>
/// 检查事件是否已处理(防止重复处理)
/// </summary>
/// <param name="eventId">事件ID</param>
/// <returns>是否已处理</returns>
private bool IsEventProcessed(string eventId)
{
return _processedEvents.ContainsKey(eventId);
}
/// <summary>
/// 标记事件为已处理
/// </summary>
/// <param name="eventId">事件ID</param>
private void MarkEventAsProcessed(string eventId)
{
_processedEvents.TryAdd(eventId, DateTime.UtcNow);
}
/// <summary>
/// 启动定期清理已处理事件记录
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartCleanupAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromMinutes(5), cancellationToken);
var cutoffTime = DateTime.UtcNow - _options.DeduplicationWindow;
var keysToRemove = _processedEvents
.Where(kvp => kvp.Value < cutoffTime)
.Select(kvp => kvp.Key)
.ToList();
foreach (var key in keysToRemove)
{
_processedEvents.TryRemove(key, out _);
}
if (keysToRemove.Count > 0)
{
_logger.LogDebug("Cleaned up {Count} processed event records", keysToRemove.Count);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in cleanup task");
}
}
}
/// <summary>
/// 获取Redis频道名称
/// </summary>
/// <returns>频道名称</returns>
private string GetChannelName()
{
return$"{_options.ChannelPrefix}:events";
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (!_disposed)
{
_cancellationTokenSource?.Cancel();
_eventWriter?.TryComplete();
try
{
Task.WhenAll(
_subscriptionTask ?? Task.CompletedTask,
_processingTask ?? Task.CompletedTask
).Wait(TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during disposal");
}
_cancellationTokenSource?.Dispose();
_disposed = true;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
/// <summary>
/// 缓存同步服务扩展方法
/// </summary>
publicstaticclassCacheSyncServiceExtensions
{
/// <summary>
/// 添加Redis缓存同步服务
/// </summary>
/// <param name="services">服务集合</param>
/// <param name="setupAction">配置委托</param>
/// <returns>服务集合</returns>
public static IServiceCollection AddRedisCacheSync(
this IServiceCollection services,
Action<CacheSyncOptions> setupAction = null)
{
// 配置选项
if (setupAction != null)
{
services.Configure(setupAction);
}
else
{
services.Configure<CacheSyncOptions>(options =>
{
// 使用默认配置
});
}
// 注册Redis缓存服务(带断路器装饰器)
services.AddSingleton<RedisDistributedCache>();
services.AddSingleton<IRedisDistributedCache>(provider =>
{
var innerCache = provider.GetRequiredService<RedisDistributedCache>();
var circuitBreaker = provider.GetRequiredService<CacheCircuitBreaker>();
var logger = provider.GetRequiredService<ILogger<CircuitBreakerRedisCache>>();
returnnew CircuitBreakerRedisCache(innerCache, circuitBreaker, logger);
});
// 注册缓存同步服务
services.AddSingleton<ICacheSyncService, RedisCacheSyncService>();
services.AddHostedService<RedisCacheSyncService>(provider =>
(RedisCacheSyncService)provider.GetRequiredService<ICacheSyncService>());
return services;
}
}
参考资料
[1]
Microsoft.Extensions.Caching.Memory: https://docs.microsoft.com/en-us/dotnet/core/extensions/caching
[2]
StackExchange.Redis Documentation: https://stackexchange.github.io/StackExchange.Redis/
[3]
Redis Official Documentation: https://redis.io/documentation
[4]
EasyCaching: https://github.com/dotnetcore/EasyCaching
[5]
FusionCache: https://github.com/jodydonetti/ZiggyCreatures.FusionCache
[6]
CacheManager: https://github.com/MichaCo/CacheManager