前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >.NET 云原生架构师训练营(模块二 基础巩固 Masstransit 异常处理)--学习笔记

.NET 云原生架构师训练营(模块二 基础巩固 Masstransit 异常处理)--学习笔记

原创
作者头像
郑子铭
修改2021-01-18 14:29:37
4000
修改2021-01-18 14:29:37
举报

2.6.8 RabbitMQ -- Masstransit 异常处理

  • 异常处理
  • 其他
  • 高级功能

异常处理

  • 异常与重试
  • 重试配置
  • 重试条件
  • 重新投递信息
  • 信箱

异常与重试

Exception

代码语言:javascript
复制
public class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public Task Consume(ConsumeContext<SubmitOrder> context)
    {
        throw new Exception("Very bad things happened");
    }
}

UseMessageRetry

代码语言:javascript
复制
var sessionFactory = CreateSessionFactory();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("rabbitmq://localhost/");

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseMessageRetry(r => r.Immediate(5));

        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });
});

重试配置

代码语言:javascript
复制
// 立即重试:一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10));

// 间隔重试:一共重试10次,每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));

// 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));

// 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));

// 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));

重试条件

代码语言:javascript
复制
e.UseMessageRetry(r => 
{
    r.Handle<ArgumentNullException>();
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

过滤某些异常类型不进行重试

重新投递信息

代码语言:javascript
复制
cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)

信箱

代码语言:javascript
复制
cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.UseInMemoryOutbox();

    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

其他

  • Fault
  • Consuming Faults
  • Error Pipe
  • Dead-Letter Pipe

Fault

代码语言:javascript
复制
public interface Fault<T>
    where T : class
{
    Guid FaultId { get; }
    Guid? FaultedMessageId { get; }
    DateTime Timestamp { get; }
    ExceptionInfo[] Exceptions { get; }
    HostInfo Host { get; }
    T Message { get; }
}

Fault 消息在异常的时候会发布出来

Consuming Faults

代码语言:javascript
复制
public class DashboardFaultConsumer :
    IConsumer<Fault<SubmitOrder>>
{
    public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        // update the dashboard
    }
}

Fault 消息也是可以进行订阅的

Error Pipe

代码语言:javascript
复制
cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardFaultedMessages();
});

默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息

Dead-Letter Pipe

代码语言:javascript
复制
cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardSkippedMessages();
});

死信队列:没有消费者的消息会被移到 _skipped 队列,但可以配置为不移到 _skipped 队列

高级功能

  • 持久化
  • Saga 事件串
  • 调度
  • Courier 最终一致性
  • 监控

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.6.8 RabbitMQ -- Masstransit 异常处理
    • 异常处理
      • 异常与重试
      • 重试配置
      • 重试条件
      • 重新投递信息
    • 信箱
      • 其他
        • Fault
        • Consuming Faults
        • Error Pipe
        • Dead-Letter Pipe
      • 高级功能
      相关产品与服务
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档