首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式事务数据库事务CAP定理BASE理论分布式事务案例

分布式事务数据库事务CAP定理BASE理论分布式事务案例

作者头像
spilledyear
发布2018-09-26 14:39:26
2.3K0
发布2018-09-26 14:39:26
举报
文章被收录于专栏:小白鼠小白鼠

分布式事务 分布式事务

数据库事务

断电了,该怎么处理?通过日志的方式!在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性。

CAP定理

  • 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效);
  • 可用性(Availability) : 每个操作都必须以可预期的响应结束;
  • 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成;

在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。

一般情况下往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,是可以接受短暂的不一致的。

BASE理论

在分布式系统中,我们往往追求的是可用性,它的重要程序比一致性要高,那么如何实现高可用性呢? 前人已经给我们提出来了另外一个理论,就是BASE理论,它是用来对CAP定理进行进一步扩充的。BASE理论指的是:

  • Basically Available(基本可用)
  • Soft state(软状态)
  • Eventually consistent(最终一致性)

BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理论的核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

分布式事务

两段式提交(2PC)

两阶段提交就是使用XA协议的原理:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。

其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。在CAP中,影响可用性。

优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域(其实也不能100%保证强一致)。 缺点: 实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,目前 .NET 界还没有实现方案。

补偿事务(TCC)

CC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try阶段:主要是对业务系统做检测及资源预留;
  • Confirm阶段:主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel阶段:主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用 1、首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。 2、在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。 3、如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。

优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些。 缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

本地消息表(异步确保)

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。 消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

MQ事务消息

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

以阿里的 RocketMQ 中间件为例,其思路大致为: 第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

Sagas事务模型

该模型其核心思想就是拆分分布式系统中的长事务为多个短事务,或者叫多个本地事务,然后由 Sagas 工作流引擎负责协调,如果整个流程正常结束,那么就算是业务成功完成,如果在这过程中实现失败,那么Sagas工作流引擎就会以相反的顺序调用补偿操作,重新进行业务回滚。

比如我们一次关于购买旅游套餐业务操作涉及到三个操作,他们分别是预定车辆,预定宾馆,预定机票,他们分别属于三个不同的远程接口。可能从我们程序的角度来说他们不属于一个事务,但是从业务角度来说是属于同一个事务的。

案例

以上只是介绍了分布式事务的几种不同解决方案,但上面只是笼统的介绍,每一种方案都有很多的细节,这里以本地消息表这种方式进行详细讲解,并提供案例以便更深入的了解。

假设现在有三个系统:系统A、消息中间件M、系统B,在A 和 B 之间存在分布式事务的需求。 根据分布式事务这篇文章上方案二的理解,大概是这么个流程:

  1. A向M 发送一条消息,告诉M它准备干活了。
  2. M向A回应一条消息,告诉A说:我收到你的消息了,你干吧!(并不一定就真的要回应一条消息给A,可以通过判断等方式达到目的)。
  3. A开始干活,即处理该分布式事务中的A部分业务。这里要分两种情况考虑: (1) A处理业务的过程中出项异常,干活失败。 (2) A处理业务的过程中表现良好,干活成功。 我们要知道的是,不管是哪种情况,A都需要向M发送一条指令:如果A干活失败,A就Rollback,然后向M发送一条Rollback指令,这时候M就会将这条消息从消息中间件中删除,这种情况就不需要和B打交道了,整个流程就相当于结束了;如果A干活成功,就向M发送一条Confirm指令,可以认为这个指令的作用就是改变消息的状态,比如改成Confirm等,只有消息是这个状态,M才能向B投递消息,这是后话,不多说了。
  4. 如果A发送的是Confirm指令,M就向B投递该消息,B收到消息后,就开始干活了。
  5. 如果B干活成功,就向M回应,M这时候可以将这条消息删除或者作废,至此整个分布式事务完成;如果B干活失败,可能就需要调用A的回滚接口,上面没有讨论这种情况,应该挺麻烦吧?

上这种情况,我们都是假设A B 和 M之间不会丢失消息,如果在上面的 3 、5 步骤中发生丢失消息的情况就会出现问题,针对以上情况,有如下解决方案:

  • 针对步骤3 当M收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Confirm或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果: 提交:若获得的状态是“提交”,则将该消息投递给系统B。 回滚:若获得的状态是“回滚”,则直接将条消息丢弃。 处理中:若获得的状态是“处理中”,则继续等待。
  • 针对步骤5 M在等待确认应答超时之后就会重新向B投递消息,直到B返回消费成功响应为止。当然,一般M可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。
准备

系统A、系统B、消息中间件使用RabbitMQ、测试工具Jmeter。

不过这个案例并不是完全按照上面说到的那样,主要区别在于:

  1. A向M发一条消息,并没有写一条消息到RabbitMQ,而仅仅是向event里面写了一条记录
  2. 为了防止A向M提交Confirm和Cancle指令时失败,M需要定时去event表里查看哪些消息创建时间大于3s并且还是unfinished状态。然后再根据这些消息的uuid去A的相关业务表查找记录,如果找到了,就置为Confirm,没找到就置为Cancel,然后再根据这两种状态分别进行不同的操作,Cancel就将消息删除;Confirm就发送一条消息到MQ,从而触发系统B的逻辑。
  3. 当B中的业务代码出现问题时,A并没有提供相应的回滚接口。
消息表
CREATE TABLE `event` (
  `uuid` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `status` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
  `create_time` bigint(20) NOT NULL COMMENT 'event的创建时间',
  `publisher_service_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发布事件的服务id',
  `type` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'event的类型',
  `json_messages` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '要发送给消息队列的消息集合',
  PRIMARY KEY (`uuid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
系统A
public boolean doA(TradeDto tradeDto) throws Exception {
    //1. 生成全局uuid
    String uuid = EventTools.generateUuid();
    //2. 创建消息
    Event event = new Event();
    event.setUuid(uuid);
    //本服务的服务名
    event.setPublisherServiceId("hap-event-demo-trade-service");
    //message包含消息队列的队列名和数据
    //payload必须含有uuid字段!!!
    TradeAmqp payload = new TradeAmqp(uuid, tradeDto.getAmount(), tradeDto.getBuyerId(), tradeDto.getSellerId());
    Event.Message message = new Event.Message("trade2", mapper.writeValueAsString(payload));
    event.setMessages(Collections.singletonList(message));
    event.setType(EVENT_TYPE_TRADE);
    
    // 消息发送之后回调执行业务逻辑
    boolean result =  eventTemplate.execute(() -> {
        TradeRecord record = new TradeRecord();
        record.setBuyerId(tradeDto.getBuyerId());
        record.setSellerId(tradeDto.getSellerId());
        record.setUuid(uuid);
        record.setCreateTime(new Timestamp(System.currentTimeMillis()));
        if (tradeRecordMapper.insert(record) != 1){
            throw new RuntimeException("init event failed");
        }
    },event);
    if (!result){
        throw new RuntimeException("error.trade.create");
    }
    return true;
}


public boolean execute(EventCallback eventCallback, Event event){
    // 开启事物
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
    TransactionStatus status = transactionManager.getTransaction(def);
    String eventId;
    // 发送事件创建消息
    try {
        // 向event表里写一条记录
        eventId = eventClient.createEvent(event);
    }catch (Exception e){
        logger.info("create event failed {}",e);
        return false;
    }
    if(eventId == null){
        return false;
    }
    try {
        // 执行具体业务,通过Feign调用M的接口
        doSomething(eventCallback);
        // 提交事务
        transactionManager.commit(status);
    } catch (Exception e) {
        logger.info("execute failed {}",e);
        // 异常回滚
        transactionManager.rollback(status);
        try{
            // 从event表里面删除消息,通过Feign调用M的接口
            eventClient.cancelEvent(eventId);
        }catch (Exception e1){
            logger.info("cancel event failed {}",e1);
        }
        return false;
    }
    try{
        // 该表event表里的消息状态为confirm,同时向RabbitMQ发送一条消息,通过Feign调用M的接口
        eventClient.confirmEvent(eventId);
    }catch (Exception e1){
        logger.info("confirm event failed {}",e1);
    }
    return true;

}


// Feign服务调用
@FeignClient(value = "hap-event-store-service")
public interface EventClient {
    @PostMapping("/v1/events")
    String createEvent(@RequestBody Event event);

    @PutMapping("/v1/events/{eventId}/confirm")
    void confirmEvent(@PathVariable("eventId") String eventId);

    @PutMapping("/v1/events/{eventId}/cancel")
    void cancelEvent(@PathVariable("eventId") String eventId);
}
消息中间件M
@Service
@Transactional(noRollbackFor = EventNotExistException.class)
public class EventServiceImpl extends BaseServiceImpl<Event> implements EventService {

  // 向event表里写一条消息,该消息的初始状态为 unfinished
  public String createEvent(Event event) {
    beanTools.setJsonMessages(event);
    event.setStatus(Event.STATUS_INIT);
    event.setCreateTime(System.currentTimeMillis());
    if (insert(event) != 1) {
      throw new HapException("error.event.create");
    }
    return event.getUuid();
  }

  // 系统A提交事务之后,改变event表里的消息状态为confirm,并向RabbitMQ发送一条消息
  public void confirmEvent(String uuid) {
    Event event = selectByPrimaryKey(uuid);
    if (event == null) {
      throw new EventNotExistException(uuid);
    }
    Event temp = new Event();
    temp.setUuid(uuid);
    temp.setStatus(Event.STATUS_CONFIRM);
    if (updateByPrimaryKeySelective(temp) != 1) {
      throw new HapException("error.event.confirm");
    }

    beanTools.setMessageList(event);
    // 这里仅仅是向队列里面插入一个元素,真正向RabbitMQ发送消息的逻辑是通过定时任务完成得
    MsgPublishExecuter.getQueue().offer(event);
  }

   // 从event表删除消息
  public void cancelEvent(String uuid) {
     if (deleteByPrimaryKey(uuid) != 1) {
      if (selectByPrimaryKey(uuid) == null) {
        throw new EventNotExistException(uuid);
      } else {
        throw new HapException("error.event.cancel");
      }
    }
  }

}

有关于MsgPublishExecuter代码

@Component
public class MsgPublishExecuter {

  private static final long QUERY_INTERVAL =  100;

  // 该队列线程安全,FIFO
  private static ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();

  private MsgPublishExecTaskImpl msgPublishExecTask;

  @Autowired
  public MsgPublishExecuter(MsgPublishExecTaskImpl msgPublishExecTask) {
    this.msgPublishExecTask = msgPublishExecTask;
  }

  static ConcurrentLinkedQueue<Event> getQueue() {
    return queue;
  }
  
  // 每100毫秒执行一次
  @Scheduled(fixedDelay = QUERY_INTERVAL)
  public void scheduledQueryJob() {
    while (!queue.isEmpty()) {
      // 发送消息到RabbitMQ,最后通过RabbitTemplate发送
      msgPublishExecTask.publishMsg(queue.poll());
    }
  }
}

有关于向RabbitMQ发送消息和防系统A到M消息丢失措施

@Configuration
@EnableKafka
public class EventConfig {

  private static final int MAX_PUBLISH_THREAD_NUM =  2;
  private static final int MAX_QUERY_THREAD_NUM =  2;
  public static final String PUBLISHER_STATUS_FINISHED = "finished";
  public static final String PUBLISHER_STATUS_CANCEL = "canceled";

  // 该线程池负责维护 发送消息到MQ的线程
  @Bean(name = "publish-executor")
  public AsyncTaskExecutor publishTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("publish-executor");
    executor.setMaxPoolSize(MAX_PUBLISH_THREAD_NUM);
    return executor;
  }

  // 该线程池负责维护 根据消息向上游系统查询消息状态的线程,主要是从msg_record表查询
  @Bean(name = "queryStatus-executor")
  public AsyncTaskExecutor queryStatusTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("queryStatus-executor");
    executor.setMaxPoolSize(MAX_QUERY_THREAD_NUM);
    return executor;
  }

}



@EnableBinding
@Component
public class MsgPublishExecTaskImpl implements PublishMsgListener{

  private PublishEventService publishEventService;

  @Autowired
  public MsgPublishExecTaskImpl(PublishEventService publishEventService) {
    this.publishEventService = publishEventService;
  }

   // 异步执行,publish-executor是一个AsyncTaskExecutor对象
  @Async("publish-executor")
  public void publishMsg(Event event) {
    publishEventService.publishMsg (event,this);
  }

  @Override
  public void onFailure(Event event) {
    MsgPublishExecuter.getQueue().offer(event);
  }
}


// 这个其实就相当于是上面提到的补救措施,即系统A向M发送消息Confirm和Cancel指令失败的情况
@Async("queryStatus-executor")
void execQuery(Event event) {
StatusQueryExecuter.eventNum.decrementAndGet();
logger.info("execQuery event: {}", event);
EventRecord result = null;
try {
    StatusQuery query= HystrixFeign
            .builder()
            .client(client)
            .decoder(new JsonDecoder())
            .target(StatusQuery.class, "http://" + event.getPublisherServiceId());
    result = query.getEventRecord(event.getUuid(), event.getType());
} catch (Exception e) {
    logger.warn("execQuery error, event.publisherServiceId: {} event.uuid: {} event.type: {}",
            event.getPublisherServiceId(), event.getUuid(), event.getType());
}
if (result == null) {
    return;
}

logger.info("execQuery result: {}", result);
if (EventConfig.PUBLISHER_STATUS_FINISHED.equals(result.getStatus())) {
    eventService.confirmEvent(event.getUuid());
}
if (EventConfig.PUBLISHER_STATUS_CANCEL.equals(result.getStatus())) {
    eventService.cancelEvent(event.getUuid());
}
}
系统B

系统B主要是监听RabbitMQ中的消息,要做的也很简单, 每条消息中包含了买方和卖方的用户ID、金额,每次交易向买方账户见钱、卖方账户加钱

@Topic(value = "trade2", retryTimes = 5, retryInterval = 10000)
@Override
public void receive(String msg) throws Exception{
    logger.info("=== 消息队列接收信息 : {}", msg);
    TradeAmqp tradeAmqp = mapper.readValue(msg, TradeAmqp.class);
    User seller = userMapper.selectByPrimaryKey(tradeAmqp.getSellerId());
    seller.setAccount(seller.getAccount() + tradeAmqp.getTradeAmount());
    logger.info("=== 用户 : {} 账户增加 : {}", seller.getName(), tradeAmqp.getTradeAmount());
    userMapper.updateByPrimaryKeySelective(seller);

    User buyer = userMapper.selectByPrimaryKey(tradeAmqp.getBuyerId());
    buyer.setAccount(buyer.getAccount() - tradeAmqp.getTradeAmount());
    logger.info("=== 用户 : {} 账户减少 : {}", buyer.getName(), tradeAmqp.getTradeAmount());
    userMapper.updateByPrimaryKeySelective(buyer);
    logger.info("AtomicInteger   {}", a.incrementAndGet());
}
测试

在测试前,先看看表里面的初始化信息

消息表

系统A业务表

系统B业务表

从上图中可以看出,初始化的时候系统B中买方和卖方的账户余额都是0。

接下来通过Jmeter进行测试:

单线程测试

只开一个线程,循环执行1000次请求,查看结果

单线程测试

RabbitMQ

但系统A后台报错了,太长了,所以删掉了一些。从日志中可以看出,好像在服务间调用的时候,即系统A调用系统M接口,向消息表写数据的时候,好像出现了问题!

2018-08-26 16:39:54.377  INFO [hap-event-demo-trade-service,6dda6aa15fe370c8,6dda6aa15fe370c8,false] 10044 --- [nio-9021-exec-6] com.hand.hap.cloud.event.EventTemplate   : confirm event failed {}

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.TimeoutException: null
    at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    ... 16 common frames omitted

2018-08-26 16:39:55.393  INFO [hap-event-demo-trade-service,3e17e98019352cb8,3e17e98019352cb8,false] 10044 --- [nio-9021-exec-9] com.hand.hap.cloud.event.EventTemplate   : create event failed {}

注意上面两条非常重要的日志:confirm event failed {}、create event failed {} ,都是调用系统M接口时出了问题。这时候再观察数据库里面的记录

SELECT count(0) FROM event;        -- 1000条
SELECT count(0) FROM trade_record;    -- 913条

SELECT * FROM user; 
1   xiaohong    29970
2   xiaoming    -29970

从上面我们可以发现两个问题

  1. 消息表有1000条记录,系统A却只有913条记录,正确的结果应该是系统A和消息表都是1000条记录。
  2. 从user表中的账户余额来看,1000次请求,每次30,正确的结果应该是30000 和 -30000,结果却少了30,也就是少了一次计算。

下面来分析来分析一下这两个问题:

  • 针对对1个问题
try {
    eventId = eventClient.createEvent(event);
}catch (Exception e){
    logger.info("create event failed {}",e);
    return false;
}

消息表里面的记录是1000,说明系统M的针对系统A发过来的每个创建消息的请求都执行成功了;系统A少了87条记录,说明系统M在成功插入消息后将结果返回给系统A的时候出问题了,原因可能就是日志上显示的那样,timed-out and no fallback available,导致系统A接下来的逻辑不能正常执行,结果A丢了一些记录。

  • 针对对2个问题: 消息表里有1000条记录,每次30,正确结果应该是30000和-30000,实际上却是是29970和-29970,买方和卖方的钱对不上,少计算了一次。买卖双方账户余额完全是由系统B的业务逻辑控制,而触发系统B执行该段业务逻辑的条件是系统M中的消息状态是confirm,现在系统M有1000条记录,系统B却只计算了999次,并且观察系统B的后台日志并没有异常发生,说明有可能系统 M中存在一条消息的状态值不是confirm
SELECT * FROM event WHERE status != 'confirm';
256012c6df01473692ae92f7f058e8a9    unfinished  1535272794404   hap-event-demo-trade-service    trade   [{"topicName":"trade2","payload":"{\"uuid\":\"256012c6df01473692ae92f7f058e8a9\",\"tradeAmount\":30.0,\"buyerId\":2,\"sellerId\":1}"}]

从上图中可以看出,确实有这么一条消息,它的状态为unfinished,说明系统M只向RabbitMQ发送了999条消息,因为只有状态为confirm的消息记录才会publish到RabbitMQ,从而导致系统B只消费了999条消息。而之所以系统M只向RabbitMQ发送999条消息,主要原因在于系统A在向系统M发送Confirm指令的时候出了问题,导致M中的消息状态不能正常变为confirm,从而导致M无法向RabbitMQ推送消息。

经过这么一个简单测试,我们就发现了这个系统存在非常大的漏洞,这还仅仅是在单线程情况下。如果你看的仔细一些,就会发现一些问题,系统A在向系统M发送Confirm指令的时候出了问题,这不是我们上面考虑到的一种情况吗?消息丢失本来就可能发生,这时M根据A提供的接口,在系统A找到记录然后将M中的那条消息记录更新为confirm不就可以吗?这样想是没有问题的,但问题的关键在于,系统M根据那条状态为unfinished消息的uuid去系统A里面找,根本找不到啊!不信我们可以看看系统M中的后台日志,每5s一次的查询:

M调用A接口查消息

可以看到,日志中提示消息丢失了,所以问题的关键还是在于系统A,我觉得 在往系统M插一条消息和往系统A里面插入一条记录这两个操作应该要在一个事务里面完成,但现在A和M是两个系统,这又涉及到分布式事务的问题,所以是否可以将消息表放到系统A中?然后这两个操作在一个事务中提交

实际情况可能会比这复杂很多吧,回到上面的报错日志:

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.

这个错误可能是因为服务调用超时导致的,基本是出现在Hystrix熔断器,在系统M中添加了以下的配置,不过这一块我也并不是很清楚

ribbon:
  ReadTimeout: 60000
  ConnectTimeout: 60000
hystrix:
  command:
    default:
      execution:
        timeout:
          enabled: false
        isolation:
          thread:
            timeoutInMilliseconds: 10000

修改之后,重修做了几次测:1000、3000、5000,还是照样会有这个问题,todo。

多线程测试

这里开50个线程,每个线程循环请求20次

多线程测试

结果好像惨不忍睹,大量的 Hystrix circuit short-circuited and is OPEN

Caused by: java.lang.RuntimeException: Hystrix circuit short-circuited and is OPEN
    at com.netflix.hystrix.AbstractCommand.handleShortCircuitViaFallback(AbstractCommand.java:979) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand.java:557) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand.access$200(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:419) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:413) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[rxjava-1.1.10.jar:1.1.10]
    ... 90 common frames omitted

在网上找了一下资料记一次feign的问题排查引起这个问题的原因是在一个滚动窗口内,失败了二十个(默认),就会发生短路,短路时间默认为5秒,5秒之内拒绝所有的请求,之后开始运行 解决办法:设置熔断器失败的个数,默认为20个,这里我给了1000个,只有超过1000个才会发生短路

hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000

配置之后,重新测试,发现又报这种错

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#createEvent(Event) could not be queued for execution and no fallback available
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3931cca2 rejected from java.util.concurrent.ThreadPoolExecutor@13ed04ed[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 116]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_101]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_101]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_101]
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_101]

不过这个RejectedExecutionException好像是线程池里面的抛出的一个异常,后面那些就是线程池的参数。 Fegin默认的核心线程数是10个,默认的阻塞队列个数maxQueueSize是-1,这里我们开了50个线程,它的最大线程数应该是小于50,所以会抛RejectedExecutionException,添加以下配置

hystrix.threadpool.default.coreSize=50

但是我这里有个疑问,根据线程池的特点,在coreSize不够的情况下,不是可以放到阻塞队列吗?但是我在这里配置阻塞队列的个数好像作用不大,例如,我以下的配置是没用的,还是会报错

hystrix.threadpool.default.coreSize=30
hystrix.threadpool.default.maxQueueSize=20

发现又会报以下错:

com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) could not be queued for execution and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.

这时候再添加一个配置,表示即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝

hystrix.threadpool.default.queueSizeRejectionThreshold = 50

所以我最终的配置如下:

ribbon:
  ReadTimeout: 60000
  ConnectTimeout: 60000
hystrix:
  threadpool:
    default:
      coreSize: 50
#      maxQueueSize: 0
      queueSizeRejectionThreshold: 50
  command:
    default:
      circuitBreaker:
        requestVolumeThreshold: 1000
      execution:
        timeout:
          enabled: false
        isolation:
          thread:
            timeoutInMilliseconds: 10000

计算机配置

50个线程,每个线程循环请求20次 因为这里请求的是系统A,所以图上的5s完成请求仅仅是指系统A完成请求,系统M发送消息到RabbitMQ是一个异步的过程,所以5s仅仅代表系统A完成业务处理,并不代表系统B也完成了业务处理

50*20

MQ

后台无异常,表数据正确:1000、1000、30000 和 -30000

50个线程,每个线程循环请求60次

50*60

MQ

后台无异常,表数据正确:4000、4000、120000 和 -120000

50个线程,每个线程循环请求100次

50*100

MQ

后台无异常,表数据正确:9000、9000、270000 和 -270000

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.08.26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据库事务
  • CAP定理
  • BASE理论
  • 分布式事务
  • 案例
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档