分布式事务数据库事务CAP定理BASE理论分布式事务案例

数据库事务

断电了,该怎么处理?通过日志的方式!在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性。

CAP定理

  • 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效);
  • 可用性(Availability) : 每个操作都必须以可预期的响应结束;
  • 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成;

在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。

一般情况下往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,是可以接受短暂的不一致的。

BASE理论

在分布式系统中,我们往往追求的是可用性,它的重要程序比一致性要高,那么如何实现高可用性呢? 前人已经给我们提出来了另外一个理论,就是BASE理论,它是用来对CAP定理进行进一步扩充的。BASE理论指的是:

  • Basically Available(基本可用)
  • Soft state(软状态)
  • Eventually consistent(最终一致性)

BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理论的核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

分布式事务

两段式提交(2PC)

两阶段提交就是使用XA协议的原理:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。

其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。在CAP中,影响可用性。

优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域(其实也不能100%保证强一致)。 缺点: 实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,目前 .NET 界还没有实现方案。

补偿事务(TCC)

CC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try阶段:主要是对业务系统做检测及资源预留;
  • Confirm阶段:主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel阶段:主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用 1、首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。 2、在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。 3、如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。

优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些。 缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

本地消息表(异步确保)

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。 消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

MQ事务消息

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

以阿里的 RocketMQ 中间件为例,其思路大致为: 第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

Sagas事务模型

该模型其核心思想就是拆分分布式系统中的长事务为多个短事务,或者叫多个本地事务,然后由 Sagas 工作流引擎负责协调,如果整个流程正常结束,那么就算是业务成功完成,如果在这过程中实现失败,那么Sagas工作流引擎就会以相反的顺序调用补偿操作,重新进行业务回滚。

比如我们一次关于购买旅游套餐业务操作涉及到三个操作,他们分别是预定车辆,预定宾馆,预定机票,他们分别属于三个不同的远程接口。可能从我们程序的角度来说他们不属于一个事务,但是从业务角度来说是属于同一个事务的。

案例

以上只是介绍了分布式事务的几种不同解决方案,但上面只是笼统的介绍,每一种方案都有很多的细节,这里以本地消息表这种方式进行详细讲解,并提供案例以便更深入的了解。

假设现在有三个系统:系统A、消息中间件M、系统B,在A 和 B 之间存在分布式事务的需求。 根据分布式事务这篇文章上方案二的理解,大概是这么个流程:

  1. A向M 发送一条消息,告诉M它准备干活了。
  2. M向A回应一条消息,告诉A说:我收到你的消息了,你干吧!(并不一定就真的要回应一条消息给A,可以通过判断等方式达到目的)。
  3. A开始干活,即处理该分布式事务中的A部分业务。这里要分两种情况考虑: (1) A处理业务的过程中出项异常,干活失败。 (2) A处理业务的过程中表现良好,干活成功。 我们要知道的是,不管是哪种情况,A都需要向M发送一条指令:如果A干活失败,A就Rollback,然后向M发送一条Rollback指令,这时候M就会将这条消息从消息中间件中删除,这种情况就不需要和B打交道了,整个流程就相当于结束了;如果A干活成功,就向M发送一条Confirm指令,可以认为这个指令的作用就是改变消息的状态,比如改成Confirm等,只有消息是这个状态,M才能向B投递消息,这是后话,不多说了。
  4. 如果A发送的是Confirm指令,M就向B投递该消息,B收到消息后,就开始干活了。
  5. 如果B干活成功,就向M回应,M这时候可以将这条消息删除或者作废,至此整个分布式事务完成;如果B干活失败,可能就需要调用A的回滚接口,上面没有讨论这种情况,应该挺麻烦吧?

上这种情况,我们都是假设A B 和 M之间不会丢失消息,如果在上面的 3 、5 步骤中发生丢失消息的情况就会出现问题,针对以上情况,有如下解决方案:

  • 针对步骤3 当M收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Confirm或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果: 提交:若获得的状态是“提交”,则将该消息投递给系统B。 回滚:若获得的状态是“回滚”,则直接将条消息丢弃。 处理中:若获得的状态是“处理中”,则继续等待。
  • 针对步骤5 M在等待确认应答超时之后就会重新向B投递消息,直到B返回消费成功响应为止。当然,一般M可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。
准备

系统A、系统B、消息中间件使用RabbitMQ、测试工具Jmeter。

不过这个案例并不是完全按照上面说到的那样,主要区别在于:

  1. A向M发一条消息,并没有写一条消息到RabbitMQ,而仅仅是向event里面写了一条记录
  2. 为了防止A向M提交Confirm和Cancle指令时失败,M需要定时去event表里查看哪些消息创建时间大于3s并且还是unfinished状态。然后再根据这些消息的uuid去A的相关业务表查找记录,如果找到了,就置为Confirm,没找到就置为Cancel,然后再根据这两种状态分别进行不同的操作,Cancel就将消息删除;Confirm就发送一条消息到MQ,从而触发系统B的逻辑。
  3. 当B中的业务代码出现问题时,A并没有提供相应的回滚接口。
消息表
CREATE TABLE `event` (
  `uuid` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `status` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
  `create_time` bigint(20) NOT NULL COMMENT 'event的创建时间',
  `publisher_service_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发布事件的服务id',
  `type` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'event的类型',
  `json_messages` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '要发送给消息队列的消息集合',
  PRIMARY KEY (`uuid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
系统A
public boolean doA(TradeDto tradeDto) throws Exception {
    //1. 生成全局uuid
    String uuid = EventTools.generateUuid();
    //2. 创建消息
    Event event = new Event();
    event.setUuid(uuid);
    //本服务的服务名
    event.setPublisherServiceId("hap-event-demo-trade-service");
    //message包含消息队列的队列名和数据
    //payload必须含有uuid字段!!!
    TradeAmqp payload = new TradeAmqp(uuid, tradeDto.getAmount(), tradeDto.getBuyerId(), tradeDto.getSellerId());
    Event.Message message = new Event.Message("trade2", mapper.writeValueAsString(payload));
    event.setMessages(Collections.singletonList(message));
    event.setType(EVENT_TYPE_TRADE);
    
    // 消息发送之后回调执行业务逻辑
    boolean result =  eventTemplate.execute(() -> {
        TradeRecord record = new TradeRecord();
        record.setBuyerId(tradeDto.getBuyerId());
        record.setSellerId(tradeDto.getSellerId());
        record.setUuid(uuid);
        record.setCreateTime(new Timestamp(System.currentTimeMillis()));
        if (tradeRecordMapper.insert(record) != 1){
            throw new RuntimeException("init event failed");
        }
    },event);
    if (!result){
        throw new RuntimeException("error.trade.create");
    }
    return true;
}


public boolean execute(EventCallback eventCallback, Event event){
    // 开启事物
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
    TransactionStatus status = transactionManager.getTransaction(def);
    String eventId;
    // 发送事件创建消息
    try {
        // 向event表里写一条记录
        eventId = eventClient.createEvent(event);
    }catch (Exception e){
        logger.info("create event failed {}",e);
        return false;
    }
    if(eventId == null){
        return false;
    }
    try {
        // 执行具体业务,通过Feign调用M的接口
        doSomething(eventCallback);
        // 提交事务
        transactionManager.commit(status);
    } catch (Exception e) {
        logger.info("execute failed {}",e);
        // 异常回滚
        transactionManager.rollback(status);
        try{
            // 从event表里面删除消息,通过Feign调用M的接口
            eventClient.cancelEvent(eventId);
        }catch (Exception e1){
            logger.info("cancel event failed {}",e1);
        }
        return false;
    }
    try{
        // 该表event表里的消息状态为confirm,同时向RabbitMQ发送一条消息,通过Feign调用M的接口
        eventClient.confirmEvent(eventId);
    }catch (Exception e1){
        logger.info("confirm event failed {}",e1);
    }
    return true;

}


// Feign服务调用
@FeignClient(value = "hap-event-store-service")
public interface EventClient {
    @PostMapping("/v1/events")
    String createEvent(@RequestBody Event event);

    @PutMapping("/v1/events/{eventId}/confirm")
    void confirmEvent(@PathVariable("eventId") String eventId);

    @PutMapping("/v1/events/{eventId}/cancel")
    void cancelEvent(@PathVariable("eventId") String eventId);
}
消息中间件M
@Service
@Transactional(noRollbackFor = EventNotExistException.class)
public class EventServiceImpl extends BaseServiceImpl<Event> implements EventService {

  // 向event表里写一条消息,该消息的初始状态为 unfinished
  public String createEvent(Event event) {
    beanTools.setJsonMessages(event);
    event.setStatus(Event.STATUS_INIT);
    event.setCreateTime(System.currentTimeMillis());
    if (insert(event) != 1) {
      throw new HapException("error.event.create");
    }
    return event.getUuid();
  }

  // 系统A提交事务之后,改变event表里的消息状态为confirm,并向RabbitMQ发送一条消息
  public void confirmEvent(String uuid) {
    Event event = selectByPrimaryKey(uuid);
    if (event == null) {
      throw new EventNotExistException(uuid);
    }
    Event temp = new Event();
    temp.setUuid(uuid);
    temp.setStatus(Event.STATUS_CONFIRM);
    if (updateByPrimaryKeySelective(temp) != 1) {
      throw new HapException("error.event.confirm");
    }

    beanTools.setMessageList(event);
    // 这里仅仅是向队列里面插入一个元素,真正向RabbitMQ发送消息的逻辑是通过定时任务完成得
    MsgPublishExecuter.getQueue().offer(event);
  }

   // 从event表删除消息
  public void cancelEvent(String uuid) {
     if (deleteByPrimaryKey(uuid) != 1) {
      if (selectByPrimaryKey(uuid) == null) {
        throw new EventNotExistException(uuid);
      } else {
        throw new HapException("error.event.cancel");
      }
    }
  }

}

有关于MsgPublishExecuter代码

@Component
public class MsgPublishExecuter {

  private static final long QUERY_INTERVAL =  100;

  // 该队列线程安全,FIFO
  private static ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();

  private MsgPublishExecTaskImpl msgPublishExecTask;

  @Autowired
  public MsgPublishExecuter(MsgPublishExecTaskImpl msgPublishExecTask) {
    this.msgPublishExecTask = msgPublishExecTask;
  }

  static ConcurrentLinkedQueue<Event> getQueue() {
    return queue;
  }
  
  // 每100毫秒执行一次
  @Scheduled(fixedDelay = QUERY_INTERVAL)
  public void scheduledQueryJob() {
    while (!queue.isEmpty()) {
      // 发送消息到RabbitMQ,最后通过RabbitTemplate发送
      msgPublishExecTask.publishMsg(queue.poll());
    }
  }
}

有关于向RabbitMQ发送消息和防系统A到M消息丢失措施

@Configuration
@EnableKafka
public class EventConfig {

  private static final int MAX_PUBLISH_THREAD_NUM =  2;
  private static final int MAX_QUERY_THREAD_NUM =  2;
  public static final String PUBLISHER_STATUS_FINISHED = "finished";
  public static final String PUBLISHER_STATUS_CANCEL = "canceled";

  // 该线程池负责维护 发送消息到MQ的线程
  @Bean(name = "publish-executor")
  public AsyncTaskExecutor publishTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("publish-executor");
    executor.setMaxPoolSize(MAX_PUBLISH_THREAD_NUM);
    return executor;
  }

  // 该线程池负责维护 根据消息向上游系统查询消息状态的线程,主要是从msg_record表查询
  @Bean(name = "queryStatus-executor")
  public AsyncTaskExecutor queryStatusTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("queryStatus-executor");
    executor.setMaxPoolSize(MAX_QUERY_THREAD_NUM);
    return executor;
  }

}



@EnableBinding
@Component
public class MsgPublishExecTaskImpl implements PublishMsgListener{

  private PublishEventService publishEventService;

  @Autowired
  public MsgPublishExecTaskImpl(PublishEventService publishEventService) {
    this.publishEventService = publishEventService;
  }

   // 异步执行,publish-executor是一个AsyncTaskExecutor对象
  @Async("publish-executor")
  public void publishMsg(Event event) {
    publishEventService.publishMsg (event,this);
  }

  @Override
  public void onFailure(Event event) {
    MsgPublishExecuter.getQueue().offer(event);
  }
}


// 这个其实就相当于是上面提到的补救措施,即系统A向M发送消息Confirm和Cancel指令失败的情况
@Async("queryStatus-executor")
void execQuery(Event event) {
StatusQueryExecuter.eventNum.decrementAndGet();
logger.info("execQuery event: {}", event);
EventRecord result = null;
try {
    StatusQuery query= HystrixFeign
            .builder()
            .client(client)
            .decoder(new JsonDecoder())
            .target(StatusQuery.class, "http://" + event.getPublisherServiceId());
    result = query.getEventRecord(event.getUuid(), event.getType());
} catch (Exception e) {
    logger.warn("execQuery error, event.publisherServiceId: {} event.uuid: {} event.type: {}",
            event.getPublisherServiceId(), event.getUuid(), event.getType());
}
if (result == null) {
    return;
}

logger.info("execQuery result: {}", result);
if (EventConfig.PUBLISHER_STATUS_FINISHED.equals(result.getStatus())) {
    eventService.confirmEvent(event.getUuid());
}
if (EventConfig.PUBLISHER_STATUS_CANCEL.equals(result.getStatus())) {
    eventService.cancelEvent(event.getUuid());
}
}
系统B

系统B主要是监听RabbitMQ中的消息,要做的也很简单, 每条消息中包含了买方和卖方的用户ID、金额,每次交易向买方账户见钱、卖方账户加钱

@Topic(value = "trade2", retryTimes = 5, retryInterval = 10000)
@Override
public void receive(String msg) throws Exception{
    logger.info("=== 消息队列接收信息 : {}", msg);
    TradeAmqp tradeAmqp = mapper.readValue(msg, TradeAmqp.class);
    User seller = userMapper.selectByPrimaryKey(tradeAmqp.getSellerId());
    seller.setAccount(seller.getAccount() + tradeAmqp.getTradeAmount());
    logger.info("=== 用户 : {} 账户增加 : {}", seller.getName(), tradeAmqp.getTradeAmount());
    userMapper.updateByPrimaryKeySelective(seller);

    User buyer = userMapper.selectByPrimaryKey(tradeAmqp.getBuyerId());
    buyer.setAccount(buyer.getAccount() - tradeAmqp.getTradeAmount());
    logger.info("=== 用户 : {} 账户减少 : {}", buyer.getName(), tradeAmqp.getTradeAmount());
    userMapper.updateByPrimaryKeySelective(buyer);
    logger.info("AtomicInteger   {}", a.incrementAndGet());
}
测试

在测试前,先看看表里面的初始化信息

消息表

系统A业务表

系统B业务表

从上图中可以看出,初始化的时候系统B中买方和卖方的账户余额都是0。

接下来通过Jmeter进行测试:

单线程测试

只开一个线程,循环执行1000次请求,查看结果

单线程测试

RabbitMQ

但系统A后台报错了,太长了,所以删掉了一些。从日志中可以看出,好像在服务间调用的时候,即系统A调用系统M接口,向消息表写数据的时候,好像出现了问题!

2018-08-26 16:39:54.377  INFO [hap-event-demo-trade-service,6dda6aa15fe370c8,6dda6aa15fe370c8,false] 10044 --- [nio-9021-exec-6] com.hand.hap.cloud.event.EventTemplate   : confirm event failed {}

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.TimeoutException: null
    at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    ... 16 common frames omitted

2018-08-26 16:39:55.393  INFO [hap-event-demo-trade-service,3e17e98019352cb8,3e17e98019352cb8,false] 10044 --- [nio-9021-exec-9] com.hand.hap.cloud.event.EventTemplate   : create event failed {}

注意上面两条非常重要的日志:confirm event failed {}、create event failed {} ,都是调用系统M接口时出了问题。这时候再观察数据库里面的记录

SELECT count(0) FROM event;        -- 1000条
SELECT count(0) FROM trade_record;    -- 913条

SELECT * FROM user; 
1   xiaohong    29970
2   xiaoming    -29970

从上面我们可以发现两个问题

  1. 消息表有1000条记录,系统A却只有913条记录,正确的结果应该是系统A和消息表都是1000条记录。
  2. 从user表中的账户余额来看,1000次请求,每次30,正确的结果应该是30000 和 -30000,结果却少了30,也就是少了一次计算。

下面来分析来分析一下这两个问题:

  • 针对对1个问题
try {
    eventId = eventClient.createEvent(event);
}catch (Exception e){
    logger.info("create event failed {}",e);
    return false;
}

消息表里面的记录是1000,说明系统M的针对系统A发过来的每个创建消息的请求都执行成功了;系统A少了87条记录,说明系统M在成功插入消息后将结果返回给系统A的时候出问题了,原因可能就是日志上显示的那样,timed-out and no fallback available,导致系统A接下来的逻辑不能正常执行,结果A丢了一些记录。

  • 针对对2个问题: 消息表里有1000条记录,每次30,正确结果应该是30000和-30000,实际上却是是29970和-29970,买方和卖方的钱对不上,少计算了一次。买卖双方账户余额完全是由系统B的业务逻辑控制,而触发系统B执行该段业务逻辑的条件是系统M中的消息状态是confirm,现在系统M有1000条记录,系统B却只计算了999次,并且观察系统B的后台日志并没有异常发生,说明有可能系统 M中存在一条消息的状态值不是confirm
SELECT * FROM event WHERE status != 'confirm';
256012c6df01473692ae92f7f058e8a9    unfinished  1535272794404   hap-event-demo-trade-service    trade   [{"topicName":"trade2","payload":"{\"uuid\":\"256012c6df01473692ae92f7f058e8a9\",\"tradeAmount\":30.0,\"buyerId\":2,\"sellerId\":1}"}]

从上图中可以看出,确实有这么一条消息,它的状态为unfinished,说明系统M只向RabbitMQ发送了999条消息,因为只有状态为confirm的消息记录才会publish到RabbitMQ,从而导致系统B只消费了999条消息。而之所以系统M只向RabbitMQ发送999条消息,主要原因在于系统A在向系统M发送Confirm指令的时候出了问题,导致M中的消息状态不能正常变为confirm,从而导致M无法向RabbitMQ推送消息。

经过这么一个简单测试,我们就发现了这个系统存在非常大的漏洞,这还仅仅是在单线程情况下。如果你看的仔细一些,就会发现一些问题,系统A在向系统M发送Confirm指令的时候出了问题,这不是我们上面考虑到的一种情况吗?消息丢失本来就可能发生,这时M根据A提供的接口,在系统A找到记录然后将M中的那条消息记录更新为confirm不就可以吗?这样想是没有问题的,但问题的关键在于,系统M根据那条状态为unfinished消息的uuid去系统A里面找,根本找不到啊!不信我们可以看看系统M中的后台日志,每5s一次的查询:

M调用A接口查消息

可以看到,日志中提示消息丢失了,所以问题的关键还是在于系统A,我觉得 在往系统M插一条消息和往系统A里面插入一条记录这两个操作应该要在一个事务里面完成,但现在A和M是两个系统,这又涉及到分布式事务的问题,所以是否可以将消息表放到系统A中?然后这两个操作在一个事务中提交

实际情况可能会比这复杂很多吧,回到上面的报错日志:

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.

这个错误可能是因为服务调用超时导致的,基本是出现在Hystrix熔断器,在系统M中添加了以下的配置,不过这一块我也并不是很清楚

ribbon:
  ReadTimeout: 60000
  ConnectTimeout: 60000
hystrix:
  command:
    default:
      execution:
        timeout:
          enabled: false
        isolation:
          thread:
            timeoutInMilliseconds: 10000

修改之后,重修做了几次测:1000、3000、5000,还是照样会有这个问题,todo。

多线程测试

这里开50个线程,每个线程循环请求20次

多线程测试

结果好像惨不忍睹,大量的 Hystrix circuit short-circuited and is OPEN

Caused by: java.lang.RuntimeException: Hystrix circuit short-circuited and is OPEN
    at com.netflix.hystrix.AbstractCommand.handleShortCircuitViaFallback(AbstractCommand.java:979) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand.java:557) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.access$200(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:419) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:413) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[rxjava-1.1.10.jar:1.1.10]
    ... 90 common frames omitted

在网上找了一下资料记一次feign的问题排查引起这个问题的原因是在一个滚动窗口内,失败了二十个(默认),就会发生短路,短路时间默认为5秒,5秒之内拒绝所有的请求,之后开始运行 解决办法:设置熔断器失败的个数,默认为20个,这里我给了1000个,只有超过1000个才会发生短路

hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000

配置之后,重新测试,发现又报这种错

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#createEvent(Event) could not be queued for execution and no fallback available
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3931cca2 rejected from java.util.concurrent.ThreadPoolExecutor@13ed04ed[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 116]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_101]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_101]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_101]
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_101]

不过这个RejectedExecutionException好像是线程池里面的抛出的一个异常,后面那些就是线程池的参数。 Fegin默认的核心线程数是10个,默认的阻塞队列个数maxQueueSize是-1,这里我们开了50个线程,它的最大线程数应该是小于50,所以会抛RejectedExecutionException,添加以下配置

hystrix.threadpool.default.coreSize=50

但是我这里有个疑问,根据线程池的特点,在coreSize不够的情况下,不是可以放到阻塞队列吗?但是我在这里配置阻塞队列的个数好像作用不大,例如,我以下的配置是没用的,还是会报错

hystrix.threadpool.default.coreSize=30
hystrix.threadpool.default.maxQueueSize=20

发现又会报以下错:

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) could not be queued for execution and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.

这时候再添加一个配置,表示即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝

hystrix.threadpool.default.queueSizeRejectionThreshold = 50

所以我最终的配置如下:

ribbon:
  ReadTimeout: 60000
  ConnectTimeout: 60000
hystrix:
  threadpool:
    default:
      coreSize: 50
#      maxQueueSize: 0
      queueSizeRejectionThreshold: 50
  command:
    default:
      circuitBreaker:
        requestVolumeThreshold: 1000
      execution:
        timeout:
          enabled: false
        isolation:
          thread:
            timeoutInMilliseconds: 10000

计算机配置

50个线程,每个线程循环请求20次 因为这里请求的是系统A,所以图上的5s完成请求仅仅是指系统A完成请求,系统M发送消息到RabbitMQ是一个异步的过程,所以5s仅仅代表系统A完成业务处理,并不代表系统B也完成了业务处理

50*20

MQ

后台无异常,表数据正确:1000、1000、30000 和 -30000

50个线程,每个线程循环请求60次

50*60

MQ

后台无异常,表数据正确:4000、4000、120000 和 -120000

50个线程,每个线程循环请求100次

50*100

MQ

后台无异常,表数据正确:9000、9000、270000 和 -270000

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏信安之路

轻松理解 Kerbreos 的认证过程

前几天在给人解释 Windows 是如何通过 Kerberos 进行 Authentication 的时候,讲了半天也别把那位老兄讲明白,还差点把自己给绕进去。

1570
来自专栏大魏分享(微信公众号:david-share)

讲真,Ansible 可以管理Windows?

前言: 本文是我和李尧老师一起实验。李尧是红帽高级培训讲师,目前负责红帽中国区员工内部技术培训与认证。 一、Ansible能对windows做什么操作? Ans...

7265
来自专栏散尽浮华

Saltstack自动化操作记录(1)-环境部署

早期运维工作中用过稍微复杂的Puppet,下面介绍下更为简单实用的Saltstack自动化运维的使用。 Saltstack知多少 Saltstack是一种全新的...

26010
来自专栏黑泽君的专栏

网络通信的三要素

5884
来自专栏自动化测试实战

看到就是赚到!Selenium完整框架——告别2017

3759
来自专栏小白鼠

分布式事务数据库事务CAP定理BASE理论分布式事务案例

断电了,该怎么处理?通过日志的方式!在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,...

4404
来自专栏沃趣科技

MySQL InnoDB Update和Crash Recovery流程

1、首先介绍了Redo,Undo,Log Sequence Number (LSN),Checkpoint,Rollback Pointer (ROLL_PTR...

5467
来自专栏杨建荣的学习笔记

曲折的10g,11g中EM的安装配置过程(r4笔记第98天)

今天在本地搭了一套oracle环境,首先安装数据库的时候顺带了EM,结果安装好之后想修改监听器的端口,把原本15521的端口换成别的,结果在目录中修改了几个参数...

2753
来自专栏有趣的Python

程序员装机必备爆款软件推荐与配置(windows版)

做机也要做一只全能的机哦 值此新年来临之即,面对两百多个G的c盘。忍痛割爱将电脑系统重装,版本为(win10:1079)之后的所有电脑环境更新,专业软件安装均会...

4393
来自专栏Eugene's Blog

黑客常用的扫描器盒子分类目录文章标签友情链接联系我们

2639

扫码关注云+社区

领取腾讯云代金券