首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >C#中的多级缓存架构设计与实现深度解析

C#中的多级缓存架构设计与实现深度解析

作者头像
郑子铭
发布2025-09-02 18:11:36
发布2025-09-02 18:11:36
21100
代码可运行
举报
运行总次数:0
代码可运行

引言

在现代分布式应用架构中,缓存已成为提升系统性能和用户体验的关键技术组件。随着业务规模的不断扩大和并发量的持续增长,单一级别的缓存往往无法满足复杂的性能需求。多级缓存架构通过在不同层次构建缓存体系,能够显著提升数据访问效率,降低数据库负载,并提供更好的系统可扩展性。

本文将深入探讨C#环境下多级缓存的架构设计与实现,重点分析内存缓存(Memory Cache)与Redis分布式缓存的协同工作机制,并详细阐述如何通过Redis的发布-订阅(Pub/Sub)模式实现不同节点间的缓存状态同步。

1. 多级缓存理论基础

1.1 缓存层次结构理论

缓存的本质是利用时间局部性(Temporal Locality)和空间局部性(Spatial Locality)原理,将频繁访问的数据存储在更快的存储介质中。在计算机系统中,从CPU缓存到内存,从内存到磁盘,都遵循着这种层次化的存储架构。

1.1.1 缓存访问模式
代码语言:javascript
代码运行次数:0
运行
复制
CPU Cache (L1/L2/L3) → Memory → Disk Storage
   ↑                    ↑         ↑
快速访问              中等速度    慢速访问
小容量                中等容量    大容量
昂贵                  适中        便宜

在应用层面,多级缓存同样遵循类似的原理:

  1. L1缓存(进程内存缓存): 访问速度最快,容量相对较小,仅在当前进程内有效
  2. L2缓存(分布式缓存): 访问速度中等,容量较大,在多个节点间共享
  3. L3缓存(数据库查询缓存): 访问速度最慢,但提供持久化存储

1.2 缓存一致性理论

1.2.1 CAP定理在缓存系统中的应用

根据CAP定理(Consistency, Availability, Partition tolerance),在分布式缓存系统中,我们无法同时保证:

  • 一致性(Consistency): 所有节点在同一时间具有相同的数据
  • 可用性(Availability): 系统持续可用,即使某些节点出现故障
  • 分区容错性(Partition Tolerance): 系统能够容忍网络分区故障

在实际应用中,我们通常采用最终一致性(Eventually Consistency)模型,通过合理的同步策略和过期机制来平衡性能与一致性。

1.2.2 缓存穿透、击穿、雪崩问题

缓存穿透(Cache Penetration)

  • 现象:查询不存在的数据,绕过缓存直接访问数据库
  • 解决方案:布隆过滤器、空值缓存

缓存击穿(Cache Breakdown)

  • 现象:热点数据过期时,大量并发请求同时访问数据库
  • 解决方案:分布式锁、热点数据永不过期

缓存雪崩(Cache Avalanche)

  • 现象:大量缓存同时失效,导致数据库压力骤增
  • 解决方案:过期时间随机化、多级缓存、熔断机制

2. 架构设计与技术选型

2.0 系统架构流程图

2.0.1 多级缓存整体架构
代码语言:javascript
代码运行次数:0
运行
复制
graph TB
    Client[客户端应用]

    subgraph "多级缓存系统"
        subgraph "应用层"
            Controller[Controller层]
            Service[Service层]
            Manager[多级缓存管理器]
        end

        subgraph "缓存层"
            subgraph "L1缓存 - 内存缓存"
                MemCache[IMemoryCache]
                AdvMemCache[高级内存缓存]
            end

            subgraph "L2缓存 - Redis分布式缓存"
                RedisConn[Redis连接管理器]
                RedisCache[Redis分布式缓存]
                RedisCluster[Redis集群]
            end
        end

        subgraph "同步机制"
            PubSub[Redis Pub/Sub]
            SyncService[缓存同步服务]
            EventQueue[事件队列]
        end

        subgraph "监控系统"
            Monitor[监控服务]
            HealthCheck[健康检查]
            Metrics[性能指标]
        end
    end

    subgraph "数据层"
        Database[(数据库)]
        External[外部API]
    end

    Client --> Controller
    Controller --> Service
    Service --> Manager

    Manager --> AdvMemCache
    Manager --> RedisCache

    AdvMemCache --> MemCache
    RedisCache --> RedisConn
    RedisConn --> RedisCluster

    RedisCache --> PubSub
    PubSub --> SyncService
    SyncService --> EventQueue
    EventQueue --> AdvMemCache

    Manager --> Monitor
    Monitor --> HealthCheck
    Monitor --> Metrics

    Manager -.-> Database
    Manager -.-> External

    style Manager fill:#e1f5fe
    style AdvMemCache fill:#f3e5f5
    style RedisCache fill:#e8f5e8
    style PubSub fill:#fff3e0
2.0.2 缓存操作流程图
代码语言:javascript
代码运行次数:0
运行
复制
sequenceDiagram
    participant Client as 客户端
    participant Manager as 多级缓存管理器
    participant L1 as L1内存缓存
    participant L2 as L2 Redis缓存
    participant Sync as 同步服务
    participant DB as 数据库

    Note over Client,DB: 读取操作流程 (GetOrSet)
    Client->>Manager: GetOrSet(key, factory)
    Manager->>L1: Get(key)

    alt L1 缓存命中
        L1->>Manager: 返回数据
        Manager->>Client: 返回结果 (L1 Hit)
    else L1 缓存未命中
        L1->>Manager: 返回 null
        Manager->>L2: Get(key)

        alt L2 缓存命中
            L2->>Manager: 返回数据
            Manager->>L1: Set(key, data) [异步提升]
            Manager->>Client: 返回结果 (L2 Hit)
        else L2 缓存未命中
            L2->>Manager: 返回 null
            Manager->>DB: factory() 执行
            DB->>Manager: 返回原始数据

            par 并行设置缓存
                Manager->>L1: Set(key, data)
                Manager->>L2: Set(key, data)
            and 发布同步事件
                Manager->>Sync: PublishSync(SET, key)
            end

            Manager->>Client: 返回结果 (DB Hit)
        end
    end

    Note over Client,DB: 更新操作流程
    Client->>Manager: Set(key, data)

    par 并行更新所有级别
        Manager->>L1: Set(key, data)
        Manager->>L2: Set(key, data)
    end

    Manager->>Sync: PublishSync(SET, key)
    Sync->>L1: 通知其他节点更新L1缓存

    Manager->>Client: 返回结果

    Note over Client,DB: 删除操作流程
    Client->>Manager: Remove(key)

    par 并行删除所有级别
        Manager->>L1: Remove(key)
        Manager->>L2: Remove(key)
    end

    Manager->>Sync: PublishSync(REMOVE, key)
    Sync->>L1: 通知其他节点删除L1缓存

    Manager->>Client: 返回结果

2.0.3 Redis发布-订阅同步机制
代码语言:javascript
代码运行次数:0
运行
复制
graph TB
    subgraph "节点A"
        AppA[应用A]
        L1A[L1缓存A]
        L2A[L2缓存A]
        SyncA[同步服务A]
    end

    subgraph "节点B"
        AppB[应用B]
        L1B[L1缓存B]
        L2B[L2缓存B]
        SyncB[同步服务B]
    end

    subgraph "节点C"
        AppC[应用C]
        L1C[L1缓存C]
        L2C[L2缓存C]
        SyncC[同步服务C]
    end

    subgraph "Redis集群"
        RedisPubSub[Redis Pub/Sub频道<br/>cache_sync:events]
    end

    %% 数据更新流程
    AppA -->|1. 更新数据| L1A
    AppA -->|2. 更新数据| L2A
    SyncA -->|3. 发布同步事件| RedisPubSub

    %% 同步通知
    RedisPubSub -->|4. 广播事件| SyncB
    RedisPubSub -->|4. 广播事件| SyncC

    %% 本地缓存更新
    SyncB -->|5. 更新本地缓存| L1B
    SyncC -->|5. 更新本地缓存| L1C

    style RedisPubSub fill:#ff9999
    style SyncA fill:#99ccff
    style SyncB fill:#99ccff
    style SyncC fill:#99ccff
2.0.4 缓存降级策略流程
代码语言:javascript
代码运行次数:0
运行
复制
flowchart TD
    Start([开始缓存操作]) --> CheckL1{L1缓存<br/>是否可用?}

    CheckL1 -->|是| TryL1[尝试L1缓存操作]
    TryL1 --> L1Success{L1操作<br/>成功?}
    L1Success -->|是| ReturnL1[返回L1结果]
    L1Success -->|否| CheckL2

    CheckL1 -->|否| CheckL2{L2缓存<br/>是否可用?}
    CheckL2 -->|是| TryL2[尝试L2缓存操作]
    TryL2 --> L2Success{L2操作<br/>成功?}
    L2Success -->|是| ReturnL2[返回L2结果]
    L2Success -->|否| CheckStrategy

    CheckL2 -->|否| CheckStrategy{降级策略}
    CheckStrategy -->|L1Only| L1OnlyMode[仅使用L1缓存]
    CheckStrategy -->|DirectAccess| DirectDB[直接访问数据库]
    CheckStrategy -->|ThrowException| ThrowError[抛出异常]

    L1OnlyMode --> TryL1Only[尝试L1操作]
    TryL1Only --> L1OnlySuccess{成功?}
    L1OnlySuccess -->|是| ReturnL1Only[返回L1结果]
    L1OnlySuccess -->|否| DirectDB

    DirectDB --> DBAccess[执行数据库查询]
    DBAccess --> ReturnDB[返回数据库结果]
    ThrowError --> ErrorReturn[返回错误]

    ReturnL1 --> End([结束])
    ReturnL2 --> End
    ReturnL1Only --> End
    ReturnDB --> End
    ErrorReturn --> End

    style CheckL1 fill:#e3f2fd
    style CheckL2 fill:#e3f2fd
    style CheckStrategy fill:#fff3e0
    style DirectDB fill:#ffebee
    style ThrowError fill:#ffcdd2

2. 架构设计与技术选型

2.1 整体架构设计

多级缓存架构采用分层设计模式,每一层都有明确的职责和边界:

代码语言:javascript
代码运行次数:0
运行
复制
┌─────────────────────────────────────────────────────┐
│                    应用层                           │
├─────────────────────────────────────────────────────┤
│              多级缓存管理器                          │
├─────────────────┬───────────────────────────────────┤
│   L1内存缓存    │         L2 Redis缓存              │
│   (MemoryCache) │      (StackExchange.Redis)        │
├─────────────────┴───────────────────────────────────┤
│              Redis Pub/Sub 同步机制                  │
├─────────────────────────────────────────────────────┤
│                   数据持久层                         │
└─────────────────────────────────────────────────────┘

2.2 技术选型分析

2.2.1 内存缓存选型

Microsoft.Extensions.Caching.Memory

  • 优势:.NET官方支持,与DI容器无缝集成,支持过期策略和内存压力驱逐
  • 适用场景:单体应用、微服务单实例缓存
  • 特性:线程安全、支持泛型、内置压缩和序列化

System.Runtime.Caching.MemoryCache

  • 优势:.NET Framework传统方案,功能成熟
  • 劣势:不支持.NET Core,API相对古老
2.2.2 分布式缓存选型

StackExchange.Redis

  • 优势:高性能、功能全面、支持集群、活跃的社区支持
  • 特性:异步操作、连接复用、故障转移、Lua脚本支持
  • 版本选择:推荐使用2.6+版本,支持.NET 6+的新特性

ServiceStack.Redis

  • 优势:易用性好,文档完善
  • 劣势:商业许可限制,性能相对较低

2.3 架构模式选择

2.3.1 Cache-Aside Pattern(缓存旁路模式)

这是最常用的缓存模式,应用程序负责管理缓存的读取和更新:

代码语言:javascript
代码运行次数:0
运行
复制
读取流程:
1. 应用程序尝试从缓存读取数据
2. 如果缓存命中,直接返回数据
3. 如果缓存未命中,从数据库读取数据
4. 将数据写入缓存,然后返回给应用程序

更新流程:
1. 更新数据库
2. 删除或更新缓存中的对应数据
2.3.2 Write-Through Pattern(写透模式)
代码语言:javascript
代码运行次数:0
运行
复制
写入流程:
1. 应用程序写入缓存
2. 缓存服务同步写入数据库
3. 确认写入完成后返回成功
2.3.3 Write-Behind Pattern(写回模式)
代码语言:javascript
代码运行次数:0
运行
复制
写入流程:
1. 应用程序写入缓存
2. 立即返回成功
3. 缓存服务异步批量写入数据库

3. 内存缓存层实现详解

3.1 IMemoryCache 核心接口分析

Microsoft.Extensions.Caching.Memory.IMemoryCache接口提供了缓存操作的核心方法:

代码语言:javascript
代码运行次数:0
运行
复制
public interface IMemoryCache : IDisposable
{
    bool TryGetValue(object key, out object value);
    ICacheEntry CreateEntry(object key);
    void Remove(object key);
}

3.2 高级内存缓存封装实现

代码语言:javascript
代码运行次数:0
运行
复制
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Text.RegularExpressions;
using System.Runtime.Serialization;

/// <summary>
/// 缓存异常基类
/// </summary>
publicabstractclassCacheException : Exception
{
    protected CacheException(string message) : base(message) { }
    protected CacheException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// 缓存连接异常
/// </summary>
publicclassCacheConnectionException : CacheException
{
    public CacheConnectionException(string message) : base(message) { }
    public CacheConnectionException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// 缓存序列化异常
/// </summary>
publicclassCacheSerializationException : CacheException
{
    public CacheSerializationException(string message) : base(message) { }
    public CacheSerializationException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// 缓存超时异常
/// </summary>
publicclassCacheTimeoutException : CacheException
{
    public CacheTimeoutException(string message) : base(message) { }
    public CacheTimeoutException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// 缓存验证异常
/// </summary>
publicclassCacheValidationException : CacheException
{
    public CacheValidationException(string message) : base(message) { }
    public CacheValidationException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// 线程安全的缓存统计追踪器
/// </summary>
publicclassCacheStatisticsTracker
{
    privatelong _totalOperations = 0;
    privatelong _l1Hits = 0;
    privatelong _l2Hits = 0;
    privatelong _totalMisses = 0;
    privatereadonlyobject _lock = newobject();

    public void RecordOperation()
    {
        Interlocked.Increment(ref _totalOperations);
    }

    public void RecordHit(CacheLevel level)
    {
        switch (level)
        {
            case CacheLevel.L1:
                Interlocked.Increment(ref _l1Hits);
                break;
            case CacheLevel.L2:
                Interlocked.Increment(ref _l2Hits);
                break;
        }
    }

    public void RecordMiss()
    {
        Interlocked.Increment(ref _totalMisses);
    }

    public CacheStatisticsSnapshot GetSnapshot()
    {
        returnnew CacheStatisticsSnapshot
        {
            TotalOperations = Interlocked.Read(ref _totalOperations),
            L1Hits = Interlocked.Read(ref _l1Hits),
            L2Hits = Interlocked.Read(ref _l2Hits),
            TotalMisses = Interlocked.Read(ref _totalMisses)
        };
    }

    public void Reset()
    {
        lock (_lock)
        {
            Interlocked.Exchange(ref _totalOperations, 0);
            Interlocked.Exchange(ref _l1Hits, 0);
            Interlocked.Exchange(ref _l2Hits, 0);
            Interlocked.Exchange(ref _totalMisses, 0);
        }
    }
}

/// <summary>
/// 缓存统计快照
/// </summary>
publicclassCacheStatisticsSnapshot
{
    publiclong TotalOperations { get; init; }
    publiclong L1Hits { get; init; }
    publiclong L2Hits { get; init; }
    publiclong TotalMisses { get; init; }
    publiclong TotalHits => L1Hits + L2Hits;
    publicdouble OverallHitRatio => TotalOperations == 0 ? 0 : (double)TotalHits / TotalOperations;
    publicdouble L1HitRatio => TotalOperations == 0 ? 0 : (double)L1Hits / TotalOperations;
    publicdouble L2HitRatio => TotalOperations == 0 ? 0 : (double)L2Hits / TotalOperations;
}

/// <summary>
/// 缓存数据验证器接口
/// </summary>
publicinterfaceICacheDataValidator
{
    bool IsValid<T>(T value);
    void ValidateKey(string key);
    bool IsSafeForSerialization<T>(T value);
}

/// <summary>
/// 默认缓存数据验证器
/// </summary>
publicclassDefaultCacheDataValidator : ICacheDataValidator
{
    privatereadonly ILogger<DefaultCacheDataValidator> _logger;
    privatereadonly HashSet<Type> _forbiddenTypes;
    privatereadonly Regex _keyValidationRegex;
    
    public DefaultCacheDataValidator(ILogger<DefaultCacheDataValidator> logger)
    {
        _logger = logger;
        _forbiddenTypes = new HashSet<Type>
        {
            typeof(System.IO.FileStream),
            typeof(System.Net.Sockets.Socket),
            typeof(System.Threading.Thread),
            typeof(System.Threading.Tasks.Task)
        };
        
        // 限制key格式:只允许字母数字下划线冒号和点
        _keyValidationRegex = new Regex(@"^[a-zA-Z0-9_:.-]+$", RegexOptions.Compiled);
    }
    
    publicbool IsValid<T>(T value)
    {
        if (value == null) returntrue;
        
        var valueType = value.GetType();
        
        // 检查禁止类型
        if (_forbiddenTypes.Contains(valueType))
        {
            _logger.LogWarning("Forbidden type in cache: {Type}", valueType.Name);
            returnfalse;
        }
        
        // 检查循环引用(简化版)
        if (HasCircularReference(value))
        {
            _logger.LogWarning("Circular reference detected in cache value");
            returnfalse;
        }
        
        returntrue;
    }
    
    public void ValidateKey(string key)
    {
        if (string.IsNullOrWhiteSpace(key))
            thrownew CacheValidationException("Cache key cannot be null or empty");
        
        if (key.Length > 250)
            thrownew CacheValidationException($"Cache key too long: {key.Length} characters");
        
        if (!_keyValidationRegex.IsMatch(key))
            thrownew CacheValidationException($"Invalid characters in cache key: {key}");
    }
    
    publicbool IsSafeForSerialization<T>(T value)
    {
        if (value == null) returntrue;
        
        var valueType = value.GetType();
        
        // 检查是否有序列化属性
        if (valueType.IsSerializable || 
            valueType.GetCustomAttributes(typeof(DataContractAttribute), false).Length > 0)
        {
            returntrue;
        }
        
        // 原始类型和字符串通常安全
        return valueType.IsPrimitive || valueType == typeof(string) || valueType == typeof(DateTime);
    }
    
    private bool HasCircularReference(object obj, HashSet<object> visited = null)
    {
        if (obj == null) returnfalse;
        
        visited ??= new HashSet<object>();
        
        if (visited.Contains(obj))
            returntrue;
        
        visited.Add(obj);
        
        // 简化的循环检测,只检查一层
        var type = obj.GetType();
        if (type.IsPrimitive || type == typeof(string))
            returnfalse;
        
        visited.Remove(obj);
        returnfalse;
    }
}

/// <summary>
/// 安全缓存管理器装饰器
/// </summary>
publicclassSecureCacheManagerDecorator : IAdvancedMemoryCache
{
    privatereadonly IAdvancedMemoryCache _innerCache;
    privatereadonly ICacheDataValidator _validator;
    privatereadonly ILogger<SecureCacheManagerDecorator> _logger;
    
    public SecureCacheManagerDecorator(
        IAdvancedMemoryCache innerCache,
        ICacheDataValidator validator,
        ILogger<SecureCacheManagerDecorator> logger)
    {
        _innerCache = innerCache ?? thrownew ArgumentNullException(nameof(innerCache));
        _validator = validator ?? thrownew ArgumentNullException(nameof(validator));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));
    }
    
    publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
    {
        _validator.ValidateKey(key);
        returnawait _innerCache.GetOrSetAsync(key, async () =>
        {
            varvalue = await factory();
            if (!_validator.IsValid(value))
            {
                thrownew CacheValidationException($"Invalid cache value for key: {key}");
            }
            returnvalue;
        }, expiry);
    }
    
    publicasync Task<T> GetAsync<T>(string key)
    {
        _validator.ValidateKey(key);
        returnawait _innerCache.GetAsync<T>(key);
    }
    
    publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        _validator.ValidateKey(key);
        
        if (!_validator.IsValid(value))
        {
            thrownew CacheValidationException($"Invalid cache value for key: {key}");
        }
        
        if (!_validator.IsSafeForSerialization(value))
        {
            _logger.LogWarning("Potentially unsafe serialization for key: {Key}, type: {Type}", 
                key, value?.GetType().Name);
        }
        
        await _innerCache.SetAsync(key, value, expiry);
    }
    
    public async Task RemoveAsync(string key)
    {
        _validator.ValidateKey(key);
        await _innerCache.RemoveAsync(key);
    }
    
    public async Task RemoveByPatternAsync(string pattern)
    {
        if (string.IsNullOrWhiteSpace(pattern))
            thrownew CacheValidationException("Pattern cannot be null or empty");
        
        await _innerCache.RemoveByPatternAsync(pattern);
    }
    
    public CacheStatistics GetStatistics() => _innerCache.GetStatistics();
    
    public void ClearStatistics() => _innerCache.ClearStatistics();
}

/// <summary>
/// 序列化器接口
/// </summary>
publicinterfaceICacheSerializer
{
    byte[] Serialize<T>(T value);
    T Deserialize<T>(byte[] data);
    string SerializerName { get; }
    bool SupportsType(Type type);
}

/// <summary>
/// JSON序列化器(默认)
/// </summary>
publicclassJsonCacheSerializer : ICacheSerializer
{
    privatereadonly JsonSerializerOptions _options;
    
    publicstring SerializerName => "JSON";
    
    public JsonCacheSerializer()
    {
        _options = new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
            WriteIndented = false,
            DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
            PropertyNameCaseInsensitive = true
        };
    }
    
    publicbyte[] Serialize<T>(T value)
    {
        if (value == null) returnnull;
        if (typeof(T) == typeof(string)) return System.Text.Encoding.UTF8.GetBytes(value.ToString());
        
        var json = JsonSerializer.Serialize(value, _options);
        return System.Text.Encoding.UTF8.GetBytes(json);
    }
    
    public T Deserialize<T>(byte[] data)
    {
        if (data == null || data.Length == 0) returndefault(T);
        if (typeof(T) == typeof(string)) return (T)(object)System.Text.Encoding.UTF8.GetString(data);
        
        var json = System.Text.Encoding.UTF8.GetString(data);
        return JsonSerializer.Deserialize<T>(json, _options);
    }
    
    public bool SupportsType(Type type)
    {
        returntrue; // JSON支持所有类型
    }
}

/// <summary>
/// 二进制序列化器(用于简单类型)
/// </summary>
publicclassBinaryCacheSerializer : ICacheSerializer
{
    publicstring SerializerName => "Binary";
    
    privatestaticreadonly HashSet<Type> SupportedTypes = new()
    {
        typeof(int), typeof(long), typeof(double), typeof(float),
        typeof(bool), typeof(byte), typeof(short),
        typeof(DateTime), typeof(DateTimeOffset), typeof(TimeSpan),
        typeof(Guid), typeof(decimal)
    };
    
    publicbyte[] Serialize<T>(T value)
    {
        if (value == null) returnnull;
        
        var type = typeof(T);
        
        // 专门处理常见类型,提高性能
        return type switch
        {
            _ when type == typeof(int) => BitConverter.GetBytes((int)(object)value),
            _ when type == typeof(long) => BitConverter.GetBytes((long)(object)value),
            _ when type == typeof(double) => BitConverter.GetBytes((double)(object)value),
            _ when type == typeof(float) => BitConverter.GetBytes((float)(object)value),
            _ when type == typeof(bool) => BitConverter.GetBytes((bool)(object)value),
            _ when type == typeof(DateTime) => BitConverter.GetBytes(((DateTime)(object)value).ToBinary()),
            _ when type == typeof(Guid) => ((Guid)(object)value).ToByteArray(),
            _ when type == typeof(string) => System.Text.Encoding.UTF8.GetBytes(value.ToString()),
            _ => thrownew NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
        };
    }
    
    public T Deserialize<T>(byte[] data)
    {
        if (data == null || data.Length == 0) returndefault(T);
        
        var type = typeof(T);
        
        object result = type switch
        {
            _ when type == typeof(int) => BitConverter.ToInt32(data, 0),
            _ when type == typeof(long) => BitConverter.ToInt64(data, 0),
            _ when type == typeof(double) => BitConverter.ToDouble(data, 0),
            _ when type == typeof(float) => BitConverter.ToSingle(data, 0),
            _ when type == typeof(bool) => BitConverter.ToBoolean(data, 0),
            _ when type == typeof(DateTime) => DateTime.FromBinary(BitConverter.ToInt64(data, 0)),
            _ when type == typeof(Guid) => new Guid(data),
            _ when type == typeof(string) => System.Text.Encoding.UTF8.GetString(data),
            _ => thrownew NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
        };
        
        return (T)result;
    }
    
    public bool SupportsType(Type type)
    {
        return SupportedTypes.Contains(type) || type == typeof(string);
    }
}

/// <summary>
/// 智能序列化器管理器
/// </summary>
publicclassSmartCacheSerializer : ICacheSerializer
{
    privatereadonly ICacheSerializer[] _serializers;
    privatereadonly ILogger<SmartCacheSerializer> _logger;
    
    publicstring SerializerName => "Smart";
    
    public SmartCacheSerializer(ILogger<SmartCacheSerializer> logger)
    {
        _logger = logger;
        _serializers = new ICacheSerializer[]
        {
            new BinaryCacheSerializer(), // 优先使用二进制序列化
            new JsonCacheSerializer()    // 备选JSON序列化
        };
    }
    
    publicbyte[] Serialize<T>(T value)
    {
        if (value == null) returnnull;
        
        var type = typeof(T);
        
        foreach (var serializer in _serializers)
        {
            if (serializer.SupportsType(type))
            {
                try
                {
                    var data = serializer.Serialize(value);
                    // 在数据开头添加序列化器标识
                    var header = System.Text.Encoding.UTF8.GetBytes(serializer.SerializerName.PadRight(8));
                    var result = newbyte[header.Length + data.Length];
                    Array.Copy(header, 0, result, 0, header.Length);
                    Array.Copy(data, 0, result, header.Length, data.Length);
                    
                    return result;
                }
                catch (Exception ex)
                {
                    _logger.LogWarning(ex, "Serializer {SerializerName} failed for type {TypeName}", 
                        serializer.SerializerName, type.Name);
                    continue;
                }
            }
        }
        
        thrownew CacheSerializationException($"No suitable serializer found for type: {type.Name}");
    }
    
    public T Deserialize<T>(byte[] data)
    {
        if (data == null || data.Length < 8) returndefault(T);
        
        // 读取序列化器标识
        var headerBytes = newbyte[8];
        Array.Copy(data, 0, headerBytes, 0, 8);
        var serializerName = System.Text.Encoding.UTF8.GetString(headerBytes).Trim();
        
        // 获取实际数据
        var actualData = newbyte[data.Length - 8];
        Array.Copy(data, 8, actualData, 0, actualData.Length);
        
        // 找到对应的序列化器
        var serializer = _serializers.FirstOrDefault(s => s.SerializerName == serializerName);
        if (serializer == null)
        {
            _logger.LogWarning("Unknown serializer: {SerializerName}, falling back to JSON", serializerName);
            serializer = new JsonCacheSerializer();
        }
        
        try
        {
            return serializer.Deserialize<T>(actualData);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to deserialize with {SerializerName}", serializerName);
            thrownew CacheSerializationException($"Deserialization failed with {serializerName}", ex);
        }
    }
    
    public bool SupportsType(Type type)
    {
        return _serializers.Any(s => s.SupportsType(type));
    }
}

/// <summary>
/// 断路器状态
/// </summary>
publicenum CircuitBreakerState
{
    Closed,     // 正常状态
    Open,       // 断路器打开,拒绝请求
    HalfOpen    // 半开状态,允许少量请求通过
}

/// <summary>
/// 缓存断路器配置
/// </summary>
publicclassCacheCircuitBreakerOptions
{
    publicint FailureThreshold { get; set; } = 5;                    // 连续失败阈值
    public TimeSpan OpenTimeout { get; set; } = TimeSpan.FromMinutes(1); // 断路器打开时间
    publicint SuccessThreshold { get; set; } = 2;                    // 半开状态成功阈值
    public TimeSpan SamplingDuration { get; set; } = TimeSpan.FromMinutes(2); // 采样时间窗口
}

/// <summary>
/// 缓存断路器
/// </summary>
publicclassCacheCircuitBreaker
{
    privatereadonly CacheCircuitBreakerOptions _options;
    privatereadonly ILogger<CacheCircuitBreaker> _logger;
    privatereadonlyobject _lock = newobject();
    
    private CircuitBreakerState _state = CircuitBreakerState.Closed;
    privateint _failureCount = 0;
    privateint _successCount = 0;
    private DateTime _lastFailureTime = DateTime.MinValue;
    private DateTime _lastStateChangeTime = DateTime.UtcNow;
    
    public CacheCircuitBreaker(
        CacheCircuitBreakerOptions options,
        ILogger<CacheCircuitBreaker> logger)
    {
        _options = options ?? thrownew ArgumentNullException(nameof(options));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));
    }
    
    public CircuitBreakerState State => _state;
    
    /// <summary>
    /// 执行带断路器保护的操作
    /// </summary>
    publicasync Task<T> ExecuteAsync<T>(Func<Task<T>> operation, string operationName = null)
    {
        if (!CanExecute())
        {
            thrownew CacheException($"Circuit breaker is OPEN for operation: {operationName}");
        }
        
        try
        {
            var result = await operation();
            OnSuccess();
            return result;
        }
        catch (Exception ex)
        {
            OnFailure(ex, operationName);
            throw;
        }
    }
    
    /// <summary>
    /// 检查是否可以执行操作
    /// </summary>
    private bool CanExecute()
    {
        lock (_lock)
        {
            switch (_state)
            {
                case CircuitBreakerState.Closed:
                    returntrue;
                    
                case CircuitBreakerState.Open:
                    // 检查是否可以转入半开状态
                    if (DateTime.UtcNow - _lastStateChangeTime >= _options.OpenTimeout)
                    {
                        _state = CircuitBreakerState.HalfOpen;
                        _successCount = 0;
                        _lastStateChangeTime = DateTime.UtcNow;
                        _logger.LogInformation("Circuit breaker entering HALF_OPEN state");
                        returntrue;
                    }
                    returnfalse;
                    
                case CircuitBreakerState.HalfOpen:
                    returntrue;
                    
                default:
                    returnfalse;
            }
        }
    }
    
    /// <summary>
    /// 操作成功回调
    /// </summary>
    private void OnSuccess()
    {
        lock (_lock)
        {
            if (_state == CircuitBreakerState.HalfOpen)
            {
                _successCount++;
                if (_successCount >= _options.SuccessThreshold)
                {
                    _state = CircuitBreakerState.Closed;
                    _failureCount = 0;
                    _successCount = 0;
                    _lastStateChangeTime = DateTime.UtcNow;
                    _logger.LogInformation("Circuit breaker entering CLOSED state");
                }
            }
            elseif (_state == CircuitBreakerState.Closed)
            {
                // 在采样时间窗口内重置失败计数
                if (DateTime.UtcNow - _lastFailureTime > _options.SamplingDuration)
                {
                    _failureCount = 0;
                }
            }
        }
    }
    
    /// <summary>
    /// 操作失败回调
    /// </summary>
    private void OnFailure(Exception ex, string operationName)
    {
        lock (_lock)
        {
            _failureCount++;
            _lastFailureTime = DateTime.UtcNow;
            
            _logger.LogWarning(ex, "Circuit breaker recorded failure #{FailureCount} for operation: {Operation}", 
                _failureCount, operationName);
            
            if (_state == CircuitBreakerState.Closed && _failureCount >= _options.FailureThreshold)
            {
                _state = CircuitBreakerState.Open;
                _lastStateChangeTime = DateTime.UtcNow;
                _logger.LogError("Circuit breaker entering OPEN state after {FailureCount} failures", _failureCount);
            }
            elseif (_state == CircuitBreakerState.HalfOpen)
            {
                _state = CircuitBreakerState.Open;
                _lastStateChangeTime = DateTime.UtcNow;
                _logger.LogWarning("Circuit breaker returning to OPEN state from HALF_OPEN due to failure");
            }
        }
    }
    
    /// <summary>
    /// 获取当前状态信息
    /// </summary>
    public object GetState()
    {
        lock (_lock)
        {
            returnnew
            {
                State = _state.ToString(),
                FailureCount = _failureCount,
                SuccessCount = _successCount,
                LastFailureTime = _lastFailureTime,
                LastStateChangeTime = _lastStateChangeTime,
                CanExecute = CanExecute()
            };
        }
    }
}

/// <summary>
/// 带断路器的Redis缓存装饰器
/// </summary>
publicclassCircuitBreakerRedisCache : IRedisDistributedCache
{
    privatereadonly IRedisDistributedCache _innerCache;
    privatereadonly CacheCircuitBreaker _circuitBreaker;
    privatereadonly ILogger<CircuitBreakerRedisCache> _logger;
    
    public CircuitBreakerRedisCache(
        IRedisDistributedCache innerCache,
        CacheCircuitBreaker circuitBreaker,
        ILogger<CircuitBreakerRedisCache> logger)
    {
        _innerCache = innerCache ?? thrownew ArgumentNullException(nameof(innerCache));
        _circuitBreaker = circuitBreaker ?? thrownew ArgumentNullException(nameof(circuitBreaker));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));
    }
    
    publicasync Task<T> GetAsync<T>(string key)
    {
        try
        {
            returnawait _circuitBreaker.ExecuteAsync(() => _innerCache.GetAsync<T>(key), $"GET:{key}");
        }
        catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
        {
            _logger.LogWarning("Circuit breaker open, returning default for key: {Key}", key);
            returndefault(T);
        }
    }
    
    publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        try
        {
            await _circuitBreaker.ExecuteAsync(() => _innerCache.SetAsync(key, value, expiry), $"SET:{key}");
        }
        catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
        {
            _logger.LogWarning("Circuit breaker open, skipping cache set for key: {Key}", key);
            // 不继续抛出异常,允许应用继续运行
        }
    }
    
    // 继续实现其他接口方法...
    public Task<bool> ExistsAsync(string key) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.ExistsAsync(key), $"EXISTS:{key}");
    
    public Task<bool> RemoveAsync(string key) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.RemoveAsync(key), $"REMOVE:{key}");
        
    public Task<long> RemoveByPatternAsync(string pattern) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.RemoveByPatternAsync(pattern), $"REMOVE_PATTERN:{pattern}");
    
    public Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.GetOrSetAsync(key, factory, expiry), $"GET_OR_SET:{key}");
    
    public Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.GetMultipleAsync<T>(keys), "GET_MULTIPLE");
    
    public Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.SetMultipleAsync(keyValuePairs, expiry), "SET_MULTIPLE");
    
    public Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT:{key}");
    
    public Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT_DOUBLE:{key}");
    
    public Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.SetIfNotExistsAsync(key, value, expiry), $"SET_IF_NOT_EXISTS:{key}");
    
    public Task<TimeSpan?> GetExpiryAsync(string key) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.GetExpiryAsync(key), $"GET_EXPIRY:{key}");
    
    public Task<bool> ExpireAsync(string key, TimeSpan expiry) => 
        _circuitBreaker.ExecuteAsync(() => _innerCache.ExpireAsync(key, expiry), $"EXPIRE:{key}");
}

/// <summary>
/// LRU缓存容器,用于防止内存泄漏
/// </summary>
publicclassLRUCache<TKey, TValue>
{
    privatereadonlyint _maxSize;
    privatereadonly Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>> _cache;
    privatereadonly LinkedList<CacheItem<TKey, TValue>> _lruList;
    privatereadonlyobject _lock = newobject();
    
    public LRUCache(int maxSize)
    {
        if (maxSize <= 0)
            thrownew ArgumentException("Max size must be greater than 0", nameof(maxSize));
        
        _maxSize = maxSize;
        _cache = new Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>>(maxSize);
        _lruList = new LinkedList<CacheItem<TKey, TValue>>();
    }
    
    publicint Count
    {
        get
        {
            lock (_lock)
            {
                return _cache.Count;
            }
        }
    }
    
    public bool TryGet(TKey key, out TValue value)
    {
        lock (_lock)
        {
            if (_cache.TryGetValue(key, outvar node))
            {
                // 移到链表头部(最近使用)
                _lruList.Remove(node);
                _lruList.AddFirst(node);
                
                value = node.Value.Value;
                returntrue;
            }
            
            value = default(TValue);
            returnfalse;
        }
    }
    
    public void Add(TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_cache.TryGetValue(key, outvar existingNode))
            {
                // 更新已存在的项
                existingNode.Value.Value = value;
                existingNode.Value.LastAccessed = DateTime.UtcNow;
                
                // 移到链表头部
                _lruList.Remove(existingNode);
                _lruList.AddFirst(existingNode);
            }
            else
            {
                // 检查容量限制
                if (_cache.Count >= _maxSize)
                {
                    // 移除最久未使用的项
                    var lastNode = _lruList.Last;
                    if (lastNode != null)
                    {
                        _cache.Remove(lastNode.Value.Key);
                        _lruList.RemoveLast();
                    }
                }
                
                // 添加新项
                var newItem = new CacheItem<TKey, TValue>
                {
                    Key = key,
                    Value = value,
                    LastAccessed = DateTime.UtcNow
                };
                
                var newNode = _lruList.AddFirst(newItem);
                _cache[key] = newNode;
            }
        }
    }
    
    public bool Remove(TKey key)
    {
        lock (_lock)
        {
            if (_cache.TryGetValue(key, outvar node))
            {
                _cache.Remove(key);
                _lruList.Remove(node);
                returntrue;
            }
            
            returnfalse;
        }
    }
    
    public void Clear()
    {
        lock (_lock)
        {
            _cache.Clear();
            _lruList.Clear();
        }
    }
    
    public IEnumerable<TKey> Keys
    {
        get
        {
            lock (_lock)
            {
                return _cache.Keys.ToList();
            }
        }
    }
    
    /// <summary>
    /// 清理过期项
    /// </summary>
    public int CleanupExpired(TimeSpan maxAge)
    {
        var cutoffTime = DateTime.UtcNow - maxAge;
        var expiredKeys = new List<TKey>();
        
        lock (_lock)
        {
            foreach (var item in _lruList)
            {
                if (item.LastAccessed < cutoffTime)
                {
                    expiredKeys.Add(item.Key);
                }
            }
            
            foreach (var key in expiredKeys)
            {
                Remove(key);
            }
        }
        
        return expiredKeys.Count;
    }
}

/// <summary>
/// LRU缓存项
/// </summary>
classCacheItem<TKey, TValue>
{
    public TKey Key { get; set; }
    public TValue Value { get; set; }
    public DateTime LastAccessed { get; set; }
}

/// <summary>
/// 高级内存缓存管理器
/// 提供泛型支持、统计信息、性能监控等功能
/// </summary>
publicinterfaceIAdvancedMemoryCache
{
    Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
    Task<T> GetAsync<T>(string key);
    Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
    Task RemoveAsync(string key);
    Task RemoveByPatternAsync(string pattern);
    CacheStatistics GetStatistics();
    void ClearStatistics();
}

/// <summary>
/// 缓存统计信息
/// </summary>
publicclassCacheStatistics
{
    publiclong HitCount { get; set; }
    publiclong MissCount { get; set; }
    publiclong SetCount { get; set; }
    publiclong RemoveCount { get; set; }
    publicdouble HitRatio => HitCount + MissCount == 0 ? 0 : (double)HitCount / (HitCount + MissCount);
    public DateTime StartTime { get; set; }
    public TimeSpan Duration => DateTime.UtcNow - StartTime;
}

/// <summary>
/// 缓存配置选项
/// </summary>
publicclassAdvancedMemoryCacheOptions
{
    publicint SizeLimit { get; set; } = 1000;
    public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromMinutes(30);
    publicbool EnableStatistics { get; set; } = true;
    publicbool EnablePatternRemoval { get; set; } = true;
    publicdouble CompactionPercentage { get; set; } = 0.1;
}

/// <summary>
/// 高级内存缓存实现
/// 基于IMemoryCache构建的功能增强版本
/// </summary>
publicclassAdvancedMemoryCache : IAdvancedMemoryCache, IDisposable
{
    privatereadonly IMemoryCache _cache;
    privatereadonly ILogger<AdvancedMemoryCache> _logger;
    privatereadonly AdvancedMemoryCacheOptions _options;
    privatereadonly CacheStatistics _statistics;
    privatereadonly ConcurrentDictionary<string, byte> _keyTracker;
    privatereadonly SemaphoreSlim _semaphore;
    privatereadonly Timer _cleanupTimer;
    
    public AdvancedMemoryCache(
        IMemoryCache cache,
        ILogger<AdvancedMemoryCache> logger,
        IOptions<AdvancedMemoryCacheOptions> options)
    {
        _cache = cache ?? thrownew ArgumentNullException(nameof(cache));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));
        _options = options?.Value ?? new AdvancedMemoryCacheOptions();
        
        _statistics = new CacheStatistics { StartTime = DateTime.UtcNow };
        _keyTracker = new ConcurrentDictionary<string, byte>();
        _semaphore = new SemaphoreSlim(1, 1);
        
        // 定期清理过期的key追踪记录
        _cleanupTimer = new Timer(CleanupKeyTracker, null, 
            TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
    }

    /// <summary>
    /// 获取或设置缓存项
    /// 这是最常用的方法,实现了Cache-Aside模式
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <param name="factory">数据工厂方法</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>缓存的值</returns>
    publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));
        
        if (factory == null)
            thrownew ArgumentNullException(nameof(factory));

        // 尝试从缓存获取
        var cachedValue = await GetAsync<T>(key);
        if (cachedValue != null)
        {
            _logger.LogDebug("Cache hit for key: {Key}", key);
            return cachedValue;
        }

        // 使用信号量防止并发执行相同的factory方法
        await _semaphore.WaitAsync();
        try
        {
            // 双重检查锁定模式
            cachedValue = await GetAsync<T>(key);
            if (cachedValue != null)
            {
                _logger.LogDebug("Cache hit on second check for key: {Key}", key);
                return cachedValue;
            }

            // 执行工厂方法获取数据
            _logger.LogDebug("Cache miss for key: {Key}, executing factory method", key);
            varvalue = await factory();
            
            // 将结果存入缓存
            await SetAsync(key, value, expiry);
            
            returnvalue;
        }
        catch (CacheConnectionException ex)
        {
            _logger.LogWarning(ex, "Cache connection failed for key: {Key}, using fallback", key);
            // 缓存连接失败时,仍执行工厂方法但不缓存结果
            returnawait factory();
        }
        catch (CacheSerializationException ex)
        {
            _logger.LogError(ex, "Serialization failed for key: {Key}", key);
            throw;
        }
        catch (CacheTimeoutException ex)
        {
            _logger.LogWarning(ex, "Cache operation timeout for key: {Key}", key);
            returnawait factory();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Unexpected error occurred while executing factory method for key: {Key}", key);
            thrownew CacheException($"Cache operation failed for key: {key}", ex);
        }
        finally
        {
            _semaphore.Release();
        }
    }

    /// <summary>
    /// 异步获取缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <returns>缓存的值,如果不存在则返回默认值</returns>
    public Task<T> GetAsync<T>(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        var found = _cache.TryGetValue(key, outvarvalue);
        
        if (_options.EnableStatistics)
        {
            if (found)
                Interlocked.Increment(ref _statistics.HitCount);
            else
                Interlocked.Increment(ref _statistics.MissCount);
        }

        if (found && valueis T typedValue)
        {
            return Task.FromResult(typedValue);
        }

        return Task.FromResult(default(T));
    }

    /// <summary>
    /// 异步设置缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <param name="value">缓存值</param>
    /// <param name="expiry">过期时间</param>
    public Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        var cacheExpiry = expiry ?? _options.DefaultExpiry;
        
        usingvar entry = _cache.CreateEntry(key);
        entry.Value = value;
        entry.AbsoluteExpirationRelativeToNow = cacheExpiry;
        entry.Size = 1; // 简化的大小计算,实际应用中可根据对象大小设置
        
        // 设置过期回调
        entry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
        {
            EvictionCallback = OnCacheEntryEvicted,
            State = key
        });

        // 追踪缓存键
        if (_options.EnablePatternRemoval)
        {
            _keyTracker.TryAdd(key, 0);
        }

        if (_options.EnableStatistics)
        {
            Interlocked.Increment(ref _statistics.SetCount);
        }

        _logger.LogDebug("Set cache entry for key: {Key}, expiry: {Expiry}", key, cacheExpiry);
        
        return Task.CompletedTask;
    }

    /// <summary>
    /// 异步移除缓存项
    /// </summary>
    /// <param name="key">缓存键</param>
    public Task RemoveAsync(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        _cache.Remove(key);
        _keyTracker.TryRemove(key, out _);

        if (_options.EnableStatistics)
        {
            Interlocked.Increment(ref _statistics.RemoveCount);
        }

        _logger.LogDebug("Removed cache entry for key: {Key}", key);
        
        return Task.CompletedTask;
    }

    /// <summary>
    /// 根据模式异步移除缓存项
    /// 支持通配符匹配,如 "user:*", "*:settings"
    /// </summary>
    /// <param name="pattern">匹配模式</param>
    public async Task RemoveByPatternAsync(string pattern)
    {
        if (string.IsNullOrEmpty(pattern))
            thrownew ArgumentException("Pattern cannot be null or empty", nameof(pattern));

        if (!_options.EnablePatternRemoval)
        {
            _logger.LogWarning("Pattern removal is disabled");
            return;
        }

        var keysToRemove = new List<string>();
        var regexPattern = ConvertWildcardToRegex(pattern);
        var regex = new System.Text.RegularExpressions.Regex(regexPattern, 
            System.Text.RegularExpressions.RegexOptions.IgnoreCase);

        foreach (var key in _keyTracker.Keys)
        {
            if (regex.IsMatch(key))
            {
                keysToRemove.Add(key);
            }
        }

        foreach (var key in keysToRemove)
        {
            await RemoveAsync(key);
        }

        _logger.LogInformation("Removed {Count} cache entries matching pattern: {Pattern}", 
            keysToRemove.Count, pattern);
    }

    /// <summary>
    /// 获取缓存统计信息
    /// </summary>
    /// <returns>统计信息对象</returns>
    public CacheStatistics GetStatistics()
    {
        if (!_options.EnableStatistics)
        {
            returnnew CacheStatistics();
        }

        returnnew CacheStatistics
        {
            HitCount = _statistics.HitCount,
            MissCount = _statistics.MissCount,
            SetCount = _statistics.SetCount,
            RemoveCount = _statistics.RemoveCount,
            StartTime = _statistics.StartTime
        };
    }

    /// <summary>
    /// 清除统计信息
    /// </summary>
    public void ClearStatistics()
    {
        if (_options.EnableStatistics)
        {
            Interlocked.Exchange(ref _statistics.HitCount, 0);
            Interlocked.Exchange(ref _statistics.MissCount, 0);
            Interlocked.Exchange(ref _statistics.SetCount, 0);
            Interlocked.Exchange(ref _statistics.RemoveCount, 0);
            _statistics.StartTime = DateTime.UtcNow;
        }
    }

    /// <summary>
    /// 缓存项被驱逐时的回调方法
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <param name="value">缓存值</param>
    /// <param name="reason">驱逐原因</param>
    /// <param name="state">状态对象</param>
    private void OnCacheEntryEvicted(object key, object value, EvictionReason reason, object state)
    {
        var cacheKey = state?.ToString();
        if (!string.IsNullOrEmpty(cacheKey))
        {
            _keyTracker.TryRemove(cacheKey, out _);
        }

        _logger.LogDebug("Cache entry evicted - Key: {Key}, Reason: {Reason}", key, reason);
    }

    /// <summary>
    /// 将通配符模式转换为正则表达式
    /// </summary>
    /// <param name="wildcardPattern">通配符模式</param>
    /// <returns>正则表达式字符串</returns>
    private static string ConvertWildcardToRegex(string wildcardPattern)
    {
        return"^" + System.Text.RegularExpressions.Regex.Escape(wildcardPattern)
            .Replace("\\*", ".*")
            .Replace("\\?", ".") + "$";
    }

    /// <summary>
    /// 定期清理key追踪器中的过期项
    /// </summary>
    /// <param name="state">定时器状态</param>
    private void CleanupKeyTracker(object state)
    {
        var keysToRemove = new List<string>();
        
        foreach (var key in _keyTracker.Keys)
        {
            if (!_cache.TryGetValue(key, out _))
            {
                keysToRemove.Add(key);
            }
        }

        foreach (var key in keysToRemove)
        {
            _keyTracker.TryRemove(key, out _);
        }

        if (keysToRemove.Count > 0)
        {
            _logger.LogDebug("Cleaned up {Count} expired keys from tracker", keysToRemove.Count);
        }
    }

    /// <summary>
    /// 释放资源
    /// </summary>
    public void Dispose()
    {
        _cleanupTimer?.Dispose();
        _semaphore?.Dispose();
        _cache?.Dispose();
    }
}

3.3 内存缓存配置和依赖注入

代码语言:javascript
代码运行次数:0
运行
复制
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

/// <summary>
/// 内存缓存服务扩展
/// </summary>
publicstaticclassMemoryCacheServiceExtensions
{
    /// <summary>
    /// 添加高级内存缓存服务
    /// </summary>
    /// <param name="services">服务集合</param>
    /// <param name="setupAction">配置委托</param>
    /// <returns>服务集合</returns>
    public static IServiceCollection AddAdvancedMemoryCache(
        this IServiceCollection services,
        Action<AdvancedMemoryCacheOptions> setupAction = null)
    {
        // 添加基础内存缓存
        services.AddMemoryCache(options =>
        {
            options.SizeLimit = 1000; // 设置缓存大小限制
            options.CompactionPercentage = 0.1; // 内存压力时的压缩百分比
            options.ExpirationScanFrequency = TimeSpan.FromMinutes(1); // 过期扫描频率
        });

        // 配置选项
        if (setupAction != null)
        {
            services.Configure(setupAction);
        }
        else
        {
            services.Configure<AdvancedMemoryCacheOptions>(options =>
            {
                // 默认配置
                options.SizeLimit = 1000;
                options.DefaultExpiry = TimeSpan.FromMinutes(30);
                options.EnableStatistics = true;
                options.EnablePatternRemoval = true;
                options.CompactionPercentage = 0.1;
            });
        }

        // 注册高级内存缓存服务(带安全装饰器)
        services.AddSingleton<AdvancedMemoryCache>();
        services.AddSingleton<IAdvancedMemoryCache>(provider =>
        {
            var innerCache = provider.GetRequiredService<AdvancedMemoryCache>();
            var validator = provider.GetRequiredService<ICacheDataValidator>();
            var logger = provider.GetRequiredService<ILogger<SecureCacheManagerDecorator>>();
            returnnew SecureCacheManagerDecorator(innerCache, validator, logger);
        });

        return services;
    }
}

/// <summary>
/// 示例:在Program.cs中的配置
/// </summary>
publicclassProgram
{
    public static void Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);

        // 添加增强组件
        builder.Services.AddSingleton<ICacheDataValidator, DefaultCacheDataValidator>();
        builder.Services.AddSingleton<ICacheSerializer, SmartCacheSerializer>();
        
        // 添加断路器配置
        builder.Services.Configure<CacheCircuitBreakerOptions>(options =>
        {
            options.FailureThreshold = 5;
            options.OpenTimeout = TimeSpan.FromMinutes(1);
            options.SuccessThreshold = 2;
        });
        builder.Services.AddSingleton<CacheCircuitBreaker>();
        
        // 添加高级内存缓存(带安全验证)
        builder.Services.AddAdvancedMemoryCache(options =>
        {
            options.SizeLimit = 2000;
            options.DefaultExpiry = TimeSpan.FromHours(1);
            options.EnableStatistics = true;
            options.EnablePatternRemoval = true;
            options.CompactionPercentage = 0.15;
        });

        var app = builder.Build();
        
        app.Run();
    }
}

4. Redis分布式缓存层实现

4.1 Redis连接管理和配置

代码语言:javascript
代码运行次数:0
运行
复制
using StackExchange.Redis;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using System.Text.Json;

/// <summary>
/// Redis缓存配置选项
/// </summary>
publicclassRedisCacheOptions
{
    publicstring ConnectionString { get; set; } = "localhost:6379";
    publicint Database { get; set; } = 0;
    publicstring KeyPrefix { get; set; } = "app:";
    public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromHours(1);
    publicint ConnectTimeout { get; set; } = 5000;
    publicint SyncTimeout { get; set; } = 1000;
    publicbool AllowAdmin { get; set; } = false;
    publicstring Password { get; set; }
    publicbool Ssl { get; set; } = false;
    publicint ConnectRetry { get; set; } = 3;
    publicbool AbortOnConnectFail { get; set; } = false;
    publicstring ClientName { get; set; } = "MultiLevelCache";
}

/// <summary>
/// Redis连接管理器
/// 提供连接池管理和故障恢复功能
/// </summary>
publicinterfaceIRedisConnectionManager : IDisposable
{
    IDatabase GetDatabase();
    ISubscriber GetSubscriber();
    IServer GetServer();
    bool IsConnected { get; }
    Task<bool> TestConnectionAsync();
}

/// <summary>
/// Redis连接管理器实现
/// </summary>
publicclassRedisConnectionManager : IRedisConnectionManager
{
    privatereadonly RedisCacheOptions _options;
    privatereadonly ILogger<RedisConnectionManager> _logger;
    privatereadonly Lazy<ConnectionMultiplexer> _connectionMultiplexer;
    privatebool _disposed = false;

    public RedisConnectionManager(
        IOptions<RedisCacheOptions> options,
        ILogger<RedisConnectionManager> logger)
    {
        _options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));
        
        _connectionMultiplexer = new Lazy<ConnectionMultiplexer>(CreateConnection);
    }

    /// <summary>
    /// 创建Redis连接
    /// </summary>
    /// <returns>ConnectionMultiplexer实例</returns>
    private ConnectionMultiplexer CreateConnection()
    {
        var configurationOptions = new ConfigurationOptions
        {
            EndPoints = { _options.ConnectionString },
            ConnectTimeout = _options.ConnectTimeout,
            SyncTimeout = _options.SyncTimeout,
            AllowAdmin = _options.AllowAdmin,
            ConnectRetry = _options.ConnectRetry,
            AbortOnConnectFail = _options.AbortOnConnectFail,
            ClientName = _options.ClientName,
            Ssl = _options.Ssl
        };

        if (!string.IsNullOrEmpty(_options.Password))
        {
            configurationOptions.Password = _options.Password;
        }

        try
        {
            var connection = ConnectionMultiplexer.Connect(configurationOptions);
            
            // 注册连接事件
            connection.ConnectionFailed += OnConnectionFailed;
            connection.ConnectionRestored += OnConnectionRestored;
            connection.ErrorMessage += OnErrorMessage;
            connection.InternalError += OnInternalError;

            _logger.LogInformation("Redis connection established successfully");
            return connection;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to establish Redis connection");
            throw;
        }
    }

    /// <summary>
    /// 获取数据库实例
    /// </summary>
    /// <returns>IDatabase实例</returns>
    public IDatabase GetDatabase()
    {
        return _connectionMultiplexer.Value.GetDatabase(_options.Database);
    }

    /// <summary>
    /// 获取订阅者实例
    /// </summary>
    /// <returns>ISubscriber实例</returns>
    public ISubscriber GetSubscriber()
    {
        return _connectionMultiplexer.Value.GetSubscriber();
    }

    /// <summary>
    /// 获取服务器实例
    /// </summary>
    /// <returns>IServer实例</returns>
    public IServer GetServer()
    {
        var endpoints = _connectionMultiplexer.Value.GetEndPoints();
        return _connectionMultiplexer.Value.GetServer(endpoints.First());
    }

    /// <summary>
    /// 检查连接状态
    /// </summary>
    publicbool IsConnected => _connectionMultiplexer.IsValueCreated && 
                              _connectionMultiplexer.Value.IsConnected;

    /// <summary>
    /// 测试连接
    /// </summary>
    /// <returns>连接是否成功</returns>
    public async Task<bool> TestConnectionAsync()
    {
        try
        {
            var database = GetDatabase();
            await database.PingAsync();
            returntrue;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Redis connection test failed");
            returnfalse;
        }
    }

    #region 事件处理

    private void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
    {
        _logger.LogError(e.Exception, "Redis connection failed: {EndPoint}", e.EndPoint);
    }

    private void OnConnectionRestored(object sender, ConnectionFailedEventArgs e)
    {
        _logger.LogInformation("Redis connection restored: {EndPoint}", e.EndPoint);
    }

    private void OnErrorMessage(object sender, RedisErrorEventArgs e)
    {
        _logger.LogError("Redis error: {Message}", e.Message);
    }

    private void OnInternalError(object sender, InternalErrorEventArgs e)
    {
        _logger.LogError(e.Exception, "Redis internal error");
    }

    #endregion

    /// <summary>
    /// 释放资源
    /// </summary>
    public void Dispose()
    {
        if (!_disposed)
        {
            if (_connectionMultiplexer.IsValueCreated)
            {
                _connectionMultiplexer.Value.Close();
                _connectionMultiplexer.Value.Dispose();
            }
            _disposed = true;
        }
    }
}

4.2 Redis分布式缓存服务实现

代码语言:javascript
代码运行次数:0
运行
复制
using StackExchange.Redis;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

/// <summary>
/// Redis分布式缓存接口
/// </summary>
publicinterfaceIRedisDistributedCache
{
    Task<T> GetAsync<T>(string key);
    Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
    Task<bool> ExistsAsync(string key);
    Task<bool> RemoveAsync(string key);
    Task<long> RemoveByPatternAsync(string pattern);
    Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
    Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys);
    Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null);
    Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null);
    Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null);
    Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null);
    Task<TimeSpan?> GetExpiryAsync(string key);
    Task<bool> ExpireAsync(string key, TimeSpan expiry);
}

/// <summary>
/// Redis分布式缓存实现
/// </summary>
publicclassRedisDistributedCache : IRedisDistributedCache
{
    privatereadonly IRedisConnectionManager _connectionManager;
    privatereadonly RedisCacheOptions _options;
    privatereadonly ILogger<RedisDistributedCache> _logger;
    privatereadonly ICacheSerializer _serializer;

    public RedisDistributedCache(
        IRedisConnectionManager connectionManager,
        IOptions<RedisCacheOptions> options,
        ILogger<RedisDistributedCache> logger)
    {
        _connectionManager = connectionManager ?? thrownew ArgumentNullException(nameof(connectionManager));
        _options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));

        // 配置JSON序列化选项
        // 使用智能序列化器替代直接的JSON序列化器
        _serializer = serviceProvider?.GetService<ICacheSerializer>() ?? new JsonCacheSerializer();
    }

    /// <summary>
    /// 异步获取缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <returns>缓存的值</returns>
    publicasync Task<T> GetAsync<T>(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            varvalue = await database.StringGetAsync(fullKey);

            if (!value.HasValue)
            {
                _logger.LogDebug("Cache miss for key: {Key}", key);
                returndefault(T);
            }

            _logger.LogDebug("Cache hit for key: {Key}", key);
            return DeserializeValue<T>(value);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error getting value from Redis for key: {Key}", key);
            returndefault(T);
        }
    }

    /// <summary>
    /// 异步设置缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <param name="value">缓存值</param>
    /// <param name="expiry">过期时间</param>
    publicasync Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            var serializedValue = SerializeValue(value);
            var expiration = expiry ?? _options.DefaultExpiry;

            await database.StringSetAsync(fullKey, serializedValue, expiration);
            _logger.LogDebug("Set cache value for key: {Key}, expiry: {Expiry}", key, expiration);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error setting value in Redis for key: {Key}", key);
            throw;
        }
    }

    /// <summary>
    /// 检查键是否存在
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <returns>键是否存在</returns>
    public async Task<bool> ExistsAsync(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            returnawait database.KeyExistsAsync(fullKey);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error checking existence in Redis for key: {Key}", key);
            returnfalse;
        }
    }

    /// <summary>
    /// 异步移除缓存项
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <returns>是否成功移除</returns>
    public async Task<bool> RemoveAsync(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            var result = await database.KeyDeleteAsync(fullKey);
            
            _logger.LogDebug("Remove cache key: {Key}, success: {Success}", key, result);
            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error removing key from Redis: {Key}", key);
            returnfalse;
        }
    }

    /// <summary>
    /// 根据模式批量删除缓存项
    /// </summary>
    /// <param name="pattern">匹配模式</param>
    /// <returns>删除的项目数量</returns>
    public async Task<long> RemoveByPatternAsync(string pattern)
    {
        if (string.IsNullOrEmpty(pattern))
            thrownew ArgumentException("Pattern cannot be null or empty", nameof(pattern));

        try
        {
            var server = _connectionManager.GetServer();
            var database = _connectionManager.GetDatabase();
            var fullPattern = GetFullKey(pattern);

            var keys = server.Keys(database.Database, fullPattern).ToArray();
            if (keys.Length == 0)
            {
                return0;
            }

            var deletedCount = await database.KeyDeleteAsync(keys);
            _logger.LogInformation("Deleted {Count} keys matching pattern: {Pattern}", deletedCount, pattern);
            
            return deletedCount;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error removing keys by pattern from Redis: {Pattern}", pattern);
            return0;
        }
    }

    /// <summary>
    /// 获取或设置缓存项(分布式锁实现)
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <param name="factory">数据工厂方法</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>缓存的值</returns>
    publicasync Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));
        
        if (factory == null)
            thrownew ArgumentNullException(nameof(factory));

        // 尝试从缓存获取
        var cachedValue = await GetAsync<T>(key);
        if (cachedValue != null)
        {
            return cachedValue;
        }

        // 使用分布式锁防止缓存击穿
        var lockKey = $"{key}:lock";
        var lockValue = Guid.NewGuid().ToString();
        var database = _connectionManager.GetDatabase();

        try
        {
            // 尝试获取分布式锁
            var lockAcquired = await database.StringSetAsync(
                GetFullKey(lockKey), 
                lockValue, 
                TimeSpan.FromMinutes(1), 
                When.NotExists);

            if (lockAcquired)
            {
                try
                {
                    // 再次检查缓存
                    cachedValue = await GetAsync<T>(key);
                    if (cachedValue != null)
                    {
                        return cachedValue;
                    }

                    // 执行工厂方法
                    _logger.LogDebug("Executing factory method for key: {Key}", key);
                    varvalue = await factory();
                    
                    // 设置缓存
                    await SetAsync(key, value, expiry);
                    
                    returnvalue;
                }
                finally
                {
                    // 释放分布式锁(使用Lua脚本确保原子性)
                    conststring releaseLockScript = @"
                        if redis.call('GET', KEYS[1]) == ARGV[1] then
                            return redis.call('DEL', KEYS[1])
                        else
                            return 0
                        end";
                    
                    await database.ScriptEvaluateAsync(
                        releaseLockScript, 
                        new RedisKey[] { GetFullKey(lockKey) }, 
                        new RedisValue[] { lockValue });
                }
            }
            else
            {
                // 等待锁释放并重试
                _logger.LogDebug("Waiting for lock to be released for key: {Key}", key);
                await Task.Delay(50); // 短暂等待
                
                // 重试获取缓存
                cachedValue = await GetAsync<T>(key);
                if (cachedValue != null)
                {
                    return cachedValue;
                }

                // 如果仍未获取到,执行降级策略
                _logger.LogWarning("Failed to acquire lock and cache miss for key: {Key}, executing factory method", key);
                returnawait factory();
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in GetOrSetAsync for key: {Key}", key);
            // 降级到直接执行工厂方法
            returnawait factory();
        }
    }

    /// <summary>
    /// 批量获取缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="keys">缓存键集合</param>
    /// <returns>键值对字典</returns>
    publicasync Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys)
    {
        if (keys == null)
            thrownew ArgumentNullException(nameof(keys));

        var keyList = keys.ToList();
        if (!keyList.Any())
        {
            returnnew Dictionary<string, T>();
        }

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKeys = keyList.Select(k => (RedisKey)GetFullKey(k)).ToArray();
            var values = await database.StringGetAsync(fullKeys);

            var result = new Dictionary<string, T>();
            for (int i = 0; i < keyList.Count; i++)
            {
                if (values[i].HasValue)
                {
                    result[keyList[i]] = DeserializeValue<T>(values[i]);
                }
            }

            _logger.LogDebug("Retrieved {Count} out of {Total} keys from Redis", 
                result.Count, keyList.Count);
            
            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error getting multiple values from Redis");
            returnnew Dictionary<string, T>();
        }
    }

    /// <summary>
    /// 批量设置缓存项
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="keyValuePairs">键值对字典</param>
    /// <param name="expiry">过期时间</param>
    publicasync Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null)
    {
        if (keyValuePairs == null || !keyValuePairs.Any())
            return;

        try
        {
            var database = _connectionManager.GetDatabase();
            var expiration = expiry ?? _options.DefaultExpiry;
            
            var tasks = keyValuePairs.Select(async kvp =>
            {
                var fullKey = GetFullKey(kvp.Key);
                var serializedValue = SerializeValue(kvp.Value);
                await database.StringSetAsync(fullKey, serializedValue, expiration);
            });

            await Task.WhenAll(tasks);
            
            _logger.LogDebug("Set {Count} cache values with expiry: {Expiry}", 
                keyValuePairs.Count, expiration);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error setting multiple values in Redis");
            throw;
        }
    }

    /// <summary>
    /// 原子递增操作
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <param name="value">递增值</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>递增后的值</returns>
    public async Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            
            var result = await database.StringIncrementAsync(fullKey, value);
            
            if (expiry.HasValue)
            {
                await database.KeyExpireAsync(fullKey, expiry.Value);
            }

            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error incrementing value in Redis for key: {Key}", key);
            throw;
        }
    }

    /// <summary>
    /// 原子递增操作(浮点数)
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <param name="value">递增值</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>递增后的值</returns>
    public async Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            
            var result = await database.StringIncrementAsync(fullKey, value);
            
            if (expiry.HasValue)
            {
                await database.KeyExpireAsync(fullKey, expiry.Value);
            }

            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error incrementing double value in Redis for key: {Key}", key);
            throw;
        }
    }

    /// <summary>
    /// 仅在键不存在时设置值
    /// </summary>
    /// <typeparam name="T">缓存项类型</typeparam>
    /// <param name="key">缓存键</param>
    /// <param name="value">缓存值</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>是否设置成功</returns>
    publicasync Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            var serializedValue = SerializeValue(value);
            var expiration = expiry ?? _options.DefaultExpiry;

            var result = await database.StringSetAsync(fullKey, serializedValue, expiration, When.NotExists);
            
            _logger.LogDebug("SetIfNotExists for key: {Key}, success: {Success}", key, result);
            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in SetIfNotExists for key: {Key}", key);
            returnfalse;
        }
    }

    /// <summary>
    /// 获取键的过期时间
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <returns>过期时间,如果键不存在或无过期时间则返回null</returns>
    publicasync Task<TimeSpan?> GetExpiryAsync(string key)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            returnawait database.KeyTimeToLiveAsync(fullKey);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error getting expiry for key: {Key}", key);
            returnnull;
        }
    }

    /// <summary>
    /// 设置键的过期时间
    /// </summary>
    /// <param name="key">缓存键</param>
    /// <param name="expiry">过期时间</param>
    /// <returns>是否设置成功</returns>
    public async Task<bool> ExpireAsync(string key, TimeSpan expiry)
    {
        if (string.IsNullOrEmpty(key))
            thrownew ArgumentException("Key cannot be null or empty", nameof(key));

        try
        {
            var database = _connectionManager.GetDatabase();
            var fullKey = GetFullKey(key);
            returnawait database.KeyExpireAsync(fullKey, expiry);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error setting expiry for key: {Key}", key);
            returnfalse;
        }
    }

    #region 辅助方法

    /// <summary>
    /// 获取完整的缓存键(带前缀)
    /// </summary>
    /// <param name="key">原始键</param>
    /// <returns>完整键</returns>
    private string GetFullKey(string key)
    {
        return$"{_options.KeyPrefix}{key}";
    }

    /// <summary>
    /// 序列化值
    /// </summary>
    /// <typeparam name="T">值类型</typeparam>
    /// <param name="value">要序列化的值</param>
    /// <returns>序列化后的字符串</returns>
    privatestring SerializeValue<T>(T value)
    {
        if (value == null) returnnull;
        
        try
        {
            var serializedBytes = _serializer.Serialize(value);
            return Convert.ToBase64String(serializedBytes);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error serializing value of type: {Type}", typeof(T).Name);
            thrownew CacheSerializationException($"Failed to serialize value of type: {typeof(T).Name}", ex);
        }
    }

    /// <summary>
    /// 反序列化值
    /// </summary>
    /// <typeparam name="T">目标类型</typeparam>
    /// <param name="value">要反序列化的值</param>
    /// <returns>反序列化后的对象</returns>
    private T DeserializeValue<T>(stringvalue)
    {
        if (string.IsNullOrEmpty(value)) returndefault(T);
        
        try
        {
            var serializedBytes = Convert.FromBase64String(value);
            return _serializer.Deserialize<T>(serializedBytes);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error deserializing value to type: {Type}", typeof(T).Name);
            thrownew CacheSerializationException($"Failed to deserialize value to type: {typeof(T).Name}", ex);
        }
    }

    #endregion
}

5. Redis发布-订阅同步机制实现

5.1 缓存同步事件模型

代码语言:javascript
代码运行次数:0
运行
复制
using System.Text.Json;

/// <summary>
/// 缓存同步事件类型
/// </summary>
publicenum CacheSyncEventType
{
    /// <summary>
    /// 缓存项被设置
    /// </summary>
    Set,
    
    /// <summary>
    /// 缓存项被删除
    /// </summary>
    Remove,
    
    /// <summary>
    /// 缓存项过期
    /// </summary>
    Expire,
    
    /// <summary>
    /// 批量删除(按模式)
    /// </summary>
    RemovePattern,
    
    /// <summary>
    /// 清空所有缓存
    /// </summary>
    Clear
}

/// <summary>
/// 缓存同步事件
/// </summary>
publicclassCacheSyncEvent
{
    /// <summary>
    /// 事件ID(用于幂等性控制)
    /// </summary>
    publicstring EventId { get; set; } = Guid.NewGuid().ToString();
    
    /// <summary>
    /// 事件类型
    /// </summary>
    public CacheSyncEventType EventType { get; set; }
    
    /// <summary>
    /// 缓存键
    /// </summary>
    publicstring Key { get; set; }
    
    /// <summary>
    /// 模式(用于批量删除)
    /// </summary>
    publicstring Pattern { get; set; }
    
    /// <summary>
    /// 事件发生时间
    /// </summary>
    public DateTime Timestamp { get; set; } = DateTime.UtcNow;
    
    /// <summary>
    /// 发起节点标识
    /// </summary>
    publicstring NodeId { get; set; }
    
    /// <summary>
    /// 附加数据
    /// </summary>
    public Dictionary<string, object> Metadata { get; set; } = new();

    /// <summary>
    /// 序列化为JSON
    /// </summary>
    /// <returns>JSON字符串</returns>
    public string ToJson()
    {
        return JsonSerializer.Serialize(this, new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        });
    }

    /// <summary>
    /// 从JSON反序列化
    /// </summary>
    /// <param name="json">JSON字符串</param>
    /// <returns>缓存同步事件</returns>
    public static CacheSyncEvent FromJson(string json)
    {
        return JsonSerializer.Deserialize<CacheSyncEvent>(json, new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        });
    }
}

/// <summary>
/// 缓存同步配置选项
/// </summary>
publicclassCacheSyncOptions
{
    /// <summary>
    /// Redis发布订阅频道前缀
    /// </summary>
    publicstring ChannelPrefix { get; set; } = "cache_sync";
    
    /// <summary>
    /// 当前节点ID
    /// </summary>
    publicstring NodeId { get; set; } = Environment.MachineName;
    
    /// <summary>
    /// 是否启用缓存同步
    /// </summary>
    publicbool EnableSync { get; set; } = true;
    
    /// <summary>
    /// 事件去重窗口时间
    /// </summary>
    public TimeSpan DeduplicationWindow { get; set; } = TimeSpan.FromSeconds(30);
    
    /// <summary>
    /// 最大重试次数
    /// </summary>
    publicint MaxRetryAttempts { get; set; } = 3;
    
    /// <summary>
    /// 重试延迟
    /// </summary>
    public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
    
    /// <summary>
    /// 批量处理的最大延迟
    /// </summary>
    public TimeSpan BatchMaxDelay { get; set; } = TimeSpan.FromMilliseconds(100);
    
    /// <summary>
    /// 批量处理的最大大小
    /// </summary>
    publicint BatchMaxSize { get; set; } = 50;
}

5.2 Redis发布-订阅同步服务

代码语言:javascript
代码运行次数:0
运行
复制
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Concurrent;
using System.Threading.Channels;

/// <summary>
/// 缓存同步服务接口
/// </summary>
publicinterfaceICacheSyncService
{
    /// <summary>
    /// 发布缓存同步事件
    /// </summary>
    /// <param name="syncEvent">同步事件</param>
    Task PublishAsync(CacheSyncEvent syncEvent);
    
    /// <summary>
    /// 订阅缓存同步事件
    /// </summary>
    /// <param name="handler">事件处理器</param>
    Task SubscribeAsync(Func<CacheSyncEvent, Task> handler);
    
    /// <summary>
    /// 取消订阅
    /// </summary>
    Task UnsubscribeAsync();
    
    /// <summary>
    /// 检查服务状态
    /// </summary>
    bool IsHealthy { get; }
}

/// <summary>
/// Redis发布-订阅缓存同步服务
/// </summary>
publicclassRedisCacheSyncService : ICacheSyncService, IHostedService, IDisposable
{
    privatereadonly IRedisConnectionManager _connectionManager;
    privatereadonly CacheSyncOptions _options;
    privatereadonly ILogger<RedisCacheSyncService> _logger;
    
    // 事件处理器集合
    privatereadonly ConcurrentBag<Func<CacheSyncEvent, Task>> _handlers;
    
    // 事件去重缓存
    privatereadonly ConcurrentDictionary<string, DateTime> _processedEvents;
    
    // 批量处理通道
    privatereadonly Channel<CacheSyncEvent> _eventChannel;
    privatereadonly ChannelWriter<CacheSyncEvent> _eventWriter;
    privatereadonly ChannelReader<CacheSyncEvent> _eventReader;
    
    // 订阅和处理任务
    private Task _subscriptionTask;
    private Task _processingTask;
    private CancellationTokenSource _cancellationTokenSource;
    
    // 服务状态
    privatevolatilebool _isHealthy = false;
    privatebool _disposed = false;

    public RedisCacheSyncService(
        IRedisConnectionManager connectionManager,
        IOptions<CacheSyncOptions> options,
        ILogger<RedisCacheSyncService> logger)
    {
        _connectionManager = connectionManager ?? thrownew ArgumentNullException(nameof(connectionManager));
        _options = options?.Value ?? thrownew ArgumentNullException(nameof(options));
        _logger = logger ?? thrownew ArgumentNullException(nameof(logger));

        _handlers = new ConcurrentBag<Func<CacheSyncEvent, Task>>();
        _processedEvents = new ConcurrentDictionary<string, DateTime>();

        // 创建有界通道用于批量处理
        var channelOptions = new BoundedChannelOptions(_options.BatchMaxSize * 2)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = true,
            SingleWriter = false
        };
        
        _eventChannel = Channel.CreateBounded<CacheSyncEvent>(channelOptions);
        _eventWriter = _eventChannel.Writer;
        _eventReader = _eventChannel.Reader;
    }

    /// <summary>
    /// 服务健康状态
    /// </summary>
    publicbool IsHealthy => _isHealthy;

    /// <summary>
    /// 发布缓存同步事件
    /// </summary>
    /// <param name="syncEvent">同步事件</param>
    public async Task PublishAsync(CacheSyncEvent syncEvent)
    {
        if (!_options.EnableSync || syncEvent == null)
            return;

        try
        {
            // 设置节点ID
            syncEvent.NodeId = _options.NodeId;
            
            var subscriber = _connectionManager.GetSubscriber();
            var channel = GetChannelName();
            var message = syncEvent.ToJson();

            await subscriber.PublishAsync(channel, message);
            
            _logger.LogDebug("Published sync event: {EventType} for key: {Key}", 
                syncEvent.EventType, syncEvent.Key);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish sync event: {EventType} for key: {Key}", 
                syncEvent.EventType, syncEvent.Key);
        }
    }

    /// <summary>
    /// 订阅缓存同步事件
    /// </summary>
    /// <param name="handler">事件处理器</param>
    public Task SubscribeAsync(Func<CacheSyncEvent, Task> handler)
    {
        if (handler == null)
            thrownew ArgumentNullException(nameof(handler));

        _handlers.Add(handler);
        _logger.LogDebug("Added sync event handler");
        
        return Task.CompletedTask;
    }

    /// <summary>
    /// 取消订阅
    /// </summary>
    public async Task UnsubscribeAsync()
    {
        try
        {
            if (_subscriptionTask != null && !_subscriptionTask.IsCompleted)
            {
                _cancellationTokenSource?.Cancel();
                await _subscriptionTask;
            }
            
            _logger.LogDebug("Unsubscribed from sync events");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error during unsubscribe");
        }
    }

    /// <summary>
    /// 启动服务
    /// </summary>
    /// <param name="cancellationToken">取消令牌</param>
    public Task StartAsync(CancellationToken cancellationToken)
    {
        if (!_options.EnableSync)
        {
            _logger.LogInformation("Cache sync is disabled");
            return Task.CompletedTask;
        }

        _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        
        // 启动Redis订阅任务
        _subscriptionTask = StartSubscriptionAsync(_cancellationTokenSource.Token);
        
        // 启动事件处理任务
        _processingTask = StartProcessingAsync(_cancellationTokenSource.Token);
        
        // 启动清理任务
        _ = Task.Run(() => StartCleanupAsync(_cancellationTokenSource.Token), cancellationToken);

        _logger.LogInformation("Cache sync service started with NodeId: {NodeId}", _options.NodeId);
        return Task.CompletedTask;
    }

    /// <summary>
    /// 停止服务
    /// </summary>
    /// <param name="cancellationToken">取消令牌</param>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Stopping cache sync service");

        _cancellationTokenSource?.Cancel();
        
        // 完成事件通道写入
        _eventWriter.TryComplete();

        try
        {
            // 等待订阅任务完成
            if (_subscriptionTask != null)
            {
                await _subscriptionTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
            }
            
            // 等待处理任务完成
            if (_processingTask != null)
            {
                await _processingTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
            }
        }
        catch (TimeoutException)
        {
            _logger.LogWarning("Cache sync service stop timed out");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error stopping cache sync service");
        }

        _isHealthy = false;
        _logger.LogInformation("Cache sync service stopped");
    }

    /// <summary>
    /// 启动Redis订阅
    /// </summary>
    /// <param name="cancellationToken">取消令牌</param>
    private async Task StartSubscriptionAsync(CancellationToken cancellationToken)
    {
        var retryCount = 0;
        
        while (!cancellationToken.IsCancellationRequested && retryCount < _options.MaxRetryAttempts)
        {
            try
            {
                var subscriber = _connectionManager.GetSubscriber();
                var channel = GetChannelName();

                await subscriber.SubscribeAsync(channel, OnMessageReceived);
                
                _isHealthy = true;
                _logger.LogInformation("Successfully subscribed to Redis channel: {Channel}", channel);
                
                // 保持订阅状态
                while (!cancellationToken.IsCancellationRequested)
                {
                    await Task.Delay(1000, cancellationToken);
                }
                
                break;
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                retryCount++;
                _isHealthy = false;
                
                _logger.LogError(ex, "Redis subscription failed, retry {RetryCount}/{MaxRetries}", 
                    retryCount, _options.MaxRetryAttempts);

                if (retryCount < _options.MaxRetryAttempts)
                {
                    await Task.Delay(_options.RetryDelay, cancellationToken);
                }
            }
        }
    }

    /// <summary>
    /// 启动事件批量处理
    /// </summary>
    /// <param name="cancellationToken">取消令牌</param>
    private async Task StartProcessingAsync(CancellationToken cancellationToken)
    {
        var eventBatch = new List<CacheSyncEvent>();
        var batchTimer = Stopwatch.StartNew();

        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                // 尝试读取事件(带超时)
                var hasEvent = await _eventReader.WaitToReadAsync(cancellationToken);
                if (!hasEvent)
                    break;

                // 收集批量事件
                while (_eventReader.TryRead(outvar syncEvent) && 
                       eventBatch.Count < _options.BatchMaxSize)
                {
                    eventBatch.Add(syncEvent);
                }

                // 检查是否应该处理批次
                var shouldProcess = eventBatch.Count >= _options.BatchMaxSize ||
                                   batchTimer.Elapsed >= _options.BatchMaxDelay ||
                                   !_eventReader.TryPeek(out _); // 没有更多事件

                if (shouldProcess && eventBatch.Count > 0)
                {
                    await ProcessEventBatchAsync(eventBatch, cancellationToken);
                    
                    eventBatch.Clear();
                    batchTimer.Restart();
                }
                elseif (eventBatch.Count > 0)
                {
                    // 短暂等待以收集更多事件
                    await Task.Delay(10, cancellationToken);
                }
            }
        }
        catch (OperationCanceledException)
        {
            // 正常取消
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in event processing loop");
        }
        finally
        {
            // 处理剩余的事件
            if (eventBatch.Count > 0)
            {
                try
                {
                    await ProcessEventBatchAsync(eventBatch, CancellationToken.None);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing final event batch");
                }
            }
        }
    }

    /// <summary>
    /// 处理事件批次
    /// </summary>
    /// <param name="events">事件列表</param>
    /// <param name="cancellationToken">取消令牌</param>
    private async Task ProcessEventBatchAsync(List<CacheSyncEvent> events, CancellationToken cancellationToken)
    {
        if (!events.Any())
            return;

        try
        {
            var tasks = events
                .Where(e => !IsEventProcessed(e.EventId))
                .Select(async e =>
                {
                    try
                    {
                        // 标记事件为已处理
                        MarkEventAsProcessed(e.EventId);
                        
                        // 并行调用所有处理器
                        var handlerTasks = _handlers.Select(handler => handler(e));
                        await Task.WhenAll(handlerTasks);
                        
                        _logger.LogDebug("Processed sync event: {EventType} for key: {Key}", 
                            e.EventType, e.Key);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error processing sync event: {EventId}", e.EventId);
                    }
                });

            await Task.WhenAll(tasks);
            
            _logger.LogDebug("Processed batch of {Count} sync events", events.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing event batch");
        }
    }

    /// <summary>
    /// 处理接收到的Redis消息
    /// </summary>
    /// <param name="channel">频道</param>
    /// <param name="message">消息</param>
    private async void OnMessageReceived(RedisChannel channel, RedisValue message)
    {
        try
        {
            if (!message.HasValue)
                return;

            var syncEvent = CacheSyncEvent.FromJson(message);
            
            // 忽略自己发送的事件
            if (syncEvent.NodeId == _options.NodeId)
            {
                _logger.LogTrace("Ignoring self-generated event: {EventId}", syncEvent.EventId);
                return;
            }

            // 将事件加入处理队列
            if (!await _eventWriter.WaitToWriteAsync())
            {
                _logger.LogWarning("Event channel is closed, dropping event: {EventId}", syncEvent.EventId);
                return;
            }

            if (!_eventWriter.TryWrite(syncEvent))
            {
                _logger.LogWarning("Failed to queue sync event: {EventId}", syncEvent.EventId);
            }
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex, "Failed to deserialize sync event message: {Message}", message.ToString());
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing received message");
        }
    }

    /// <summary>
    /// 检查事件是否已处理(防止重复处理)
    /// </summary>
    /// <param name="eventId">事件ID</param>
    /// <returns>是否已处理</returns>
    private bool IsEventProcessed(string eventId)
    {
        return _processedEvents.ContainsKey(eventId);
    }

    /// <summary>
    /// 标记事件为已处理
    /// </summary>
    /// <param name="eventId">事件ID</param>
    private void MarkEventAsProcessed(string eventId)
    {
        _processedEvents.TryAdd(eventId, DateTime.UtcNow);
    }

    /// <summary>
    /// 启动定期清理已处理事件记录
    /// </summary>
    /// <param name="cancellationToken">取消令牌</param>
    private async Task StartCleanupAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                await Task.Delay(TimeSpan.FromMinutes(5), cancellationToken);
                
                var cutoffTime = DateTime.UtcNow - _options.DeduplicationWindow;
                var keysToRemove = _processedEvents
                    .Where(kvp => kvp.Value < cutoffTime)
                    .Select(kvp => kvp.Key)
                    .ToList();

                foreach (var key in keysToRemove)
                {
                    _processedEvents.TryRemove(key, out _);
                }

                if (keysToRemove.Count > 0)
                {
                    _logger.LogDebug("Cleaned up {Count} processed event records", keysToRemove.Count);
                }
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in cleanup task");
            }
        }
    }

    /// <summary>
    /// 获取Redis频道名称
    /// </summary>
    /// <returns>频道名称</returns>
    private string GetChannelName()
    {
        return$"{_options.ChannelPrefix}:events";
    }

    /// <summary>
    /// 释放资源
    /// </summary>
    public void Dispose()
    {
        if (!_disposed)
        {
            _cancellationTokenSource?.Cancel();
            _eventWriter?.TryComplete();
            
            try
            {
                Task.WhenAll(
                    _subscriptionTask ?? Task.CompletedTask,
                    _processingTask ?? Task.CompletedTask
                ).Wait(TimeSpan.FromSeconds(5));
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error during disposal");
            }
            
            _cancellationTokenSource?.Dispose();
            _disposed = true;
        }
    }
}

5.3 缓存同步扩展方法

代码语言:javascript
代码运行次数:0
运行
复制
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

/// <summary>
/// 缓存同步服务扩展方法
/// </summary>
publicstaticclassCacheSyncServiceExtensions
{
    /// <summary>
    /// 添加Redis缓存同步服务
    /// </summary>
    /// <param name="services">服务集合</param>
    /// <param name="setupAction">配置委托</param>
    /// <returns>服务集合</returns>
    public static IServiceCollection AddRedisCacheSync(
        this IServiceCollection services,
        Action<CacheSyncOptions> setupAction = null)
    {
        // 配置选项
        if (setupAction != null)
        {
            services.Configure(setupAction);
        }
        else
        {
            services.Configure<CacheSyncOptions>(options =>
            {
                // 使用默认配置
            });
        }

        // 注册Redis缓存服务(带断路器装饰器)
        services.AddSingleton<RedisDistributedCache>();
        services.AddSingleton<IRedisDistributedCache>(provider =>
        {
            var innerCache = provider.GetRequiredService<RedisDistributedCache>();
            var circuitBreaker = provider.GetRequiredService<CacheCircuitBreaker>();
            var logger = provider.GetRequiredService<ILogger<CircuitBreakerRedisCache>>();
            returnnew CircuitBreakerRedisCache(innerCache, circuitBreaker, logger);
        });
        
        // 注册缓存同步服务
        services.AddSingleton<ICacheSyncService, RedisCacheSyncService>();
        services.AddHostedService<RedisCacheSyncService>(provider => 
            (RedisCacheSyncService)provider.GetRequiredService<ICacheSyncService>());

        return services;
    }
}

7.1 学习资源和参考文献

7.1.1 官方文档
  • Microsoft.Extensions.Caching.Memory[1]
  • StackExchange.Redis Documentation[2]
  • Redis Official Documentation[3]
7.1.2 推荐书籍
  • 《高性能MySQL》- 缓存设计理论基础
  • 《Redis设计与实现》- Redis深度解析
  • 《.NET性能优化》- .NET平台性能调优
7.1.3 开源项目参考
  • EasyCaching[4] - .NET缓存框架
  • FusionCache[5] - 高级缓存库
  • CacheManager[6] - 多级缓存管理器

结尾

参考资料

[1]

Microsoft.Extensions.Caching.Memory: https://docs.microsoft.com/en-us/dotnet/core/extensions/caching

[2]

StackExchange.Redis Documentation: https://stackexchange.github.io/StackExchange.Redis/

[3]

Redis Official Documentation: https://redis.io/documentation

[4]

EasyCaching: https://github.com/dotnetcore/EasyCaching

[5]

FusionCache: https://github.com/jodydonetti/ZiggyCreatures.FusionCache

[6]

CacheManager: https://github.com/MichaCo/CacheManager

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
    • 1. 多级缓存理论基础
      • 1.1 缓存层次结构理论
      • 1.2 缓存一致性理论
    • 2. 架构设计与技术选型
      • 2.0 系统架构流程图
    • 2. 架构设计与技术选型
      • 2.1 整体架构设计
      • 2.2 技术选型分析
      • 2.3 架构模式选择
    • 3. 内存缓存层实现详解
      • 3.1 IMemoryCache 核心接口分析
      • 3.2 高级内存缓存封装实现
      • 3.3 内存缓存配置和依赖注入
    • 4. Redis分布式缓存层实现
      • 4.1 Redis连接管理和配置
      • 4.2 Redis分布式缓存服务实现
    • 5. Redis发布-订阅同步机制实现
      • 5.1 缓存同步事件模型
      • 5.2 Redis发布-订阅同步服务
      • 5.3 缓存同步扩展方法
    • 7.1 学习资源和参考文献
    • 结尾
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档