专栏首页一个会写诗的程序员的博客分布式系统中的BASE 和 ACID、幂等性、分布式锁、分布式事务与异步消息处理

分布式系统中的BASE 和 ACID、幂等性、分布式锁、分布式事务与异步消息处理

一、BASE 和 ACID

ACID

ACID 四项特性分别为:

原子性(A)。所有的系统都受惠于原子性操作。当我们考虑可用性的时候,没有理由去改变分区两侧操作的原子性。而且满足 ACID 定义的、高抽象层次的原子操作,实际上会简化分区恢复。

一致性(C)。ACID 的 C 指的是事务不能破坏任何数据库规则,如键的唯一性。与之相比,CAP 的 C 仅指单一副本这个意义上的一致性,因此只是 ACID 一致性约束的一个严格的子集。ACID 一致性不可能在分区过程中保持,因此分区恢复时需要重建 ACID 一致性。推而广之,分区期间也许不可能维持某些不变性约束,所以有必要仔细考虑哪些操作应该禁止,分区后又如何恢复这些不变性约束。

隔离性(I)。隔离是 CAP 理论的核心:如果系统要求 ACID 隔离性,那么它在分区期间最多可以在分区一侧维持操作。事务的可串行性(serializability)要求全局的通信,因此在分区的情况下不能成立。只要在分区恢复时进行补偿,在分区前后保持一个较弱的正确性定义是可行的。

持久性(D)。牺牲持久性没有意义,理由和原子性一样,虽然开发者有理由(持久性成本太高)选择 BASE 风格的软状态来避免实现持久性。这里有一个细节,分区恢复可能因为回退持久性操作,而无意中破坏某项不变性约束。但只要恢复时给定分区两侧的持久性操作历史记录,破坏不变性约束的操作还是可以被检测出来并修正的。通常来讲,让分区两侧的事务都满足 ACID 特性会使得后续的分区恢复变得更容易,并且为分区恢复时事务的补偿工作奠定了基本的条件。

BASE, “Basically Available, Soft state, Eventually consistent(基本可用、软状态、最终一致性)”.

CAP理论

2000年7月,Eric Brewer教授提出CAP猜想;2年后,Seth Gilbert和Nancy Lynch从理论上证明了CAP;之后,CAP理论正式成为分布式计算领域的公认定理。 CAP定律说的是在一个分布式计算机系统中,一致性,可用性和分区容错性这三种保证无法同时得到满足,最多满足两个。 CAP:C :Consistency(一致性)A:(Availability)可用性P:(Partition Tolerance)分区容错性

让我们构造一个非常简单的分布式系统。

  • 两台服务器G1和G2
  • 两台服务器可以相互通讯
  • 客户端可以随机访问任何一台服务器
Consistency(一致性)

Gilbert and Lynch 这样描述的一致性.

any read operation that begins after a write operation completes must return that value, or the result of a later write operation

在写操作完成之后的任何读操作都必须返回该值。

客户端向G1服务器发起一个写操作,把变量初始值v0 改为v1,接下来客户端可能向节点G1读取也可能向节点G2读取;

  • 向G1发起一个读操作,得到更改后的值V1。这就是满足了一致性
  • 向G2发起一个读操作,此时G1向G2发送同步消息
  1. 如果同步完成 ,那么读到的结果是v1,这样也满足了一致性
  1. 还未同步完成,这是G2还是v0,这就不满足一致性。
(Partition Tolerance)分区容错

Gilbert and Lynch 这样描述的分区容错.

The network will be allowed to lose arbitrarily many messages sent from one node to another. 从一个节点发送到另外一个节点过程中,允许丢失任意多的消息.

在分布式环境中,节点之间的通信可能出现问题,整个系统就产生所谓的分区。所以我们在设计的时候需要考虑这种情况;剩下来的 A和C满足好,我们就可以说我们的系统有很好的分区容错性。

(Availability)可用性

Gilbert and Lynch 对 availability的描述原文. every request received by a non-failing node in the system must result in a response 系统中非失败节点收到的每个请求都必须导致响应 在可用性系统中,只要服务器没有奔溃,客户端发送请求,服务器必须返回一个相应给客户端。

为什么要CAP不能同时满足?

通过上述的定义和描述知道分区无法避免,p总是要考虑的。为什么c和a无法同时做到呢?其实都是分区惹的祸。

如果我们保证一致性;那么G1写入操作之后,必须保证数据同步给G2之后,G2才能对外提供响应,这显然就没有可用性了。

反之我们保证可用性,那就没法保证一致性。在分布式系统中,我们一般会选择AP而牺牲一致性。牺牲并不意味着不关心一致性,而是首先满足A和P,稍后再解决C的问题。

CAP 理论主张任何基于网络的数据共享系统,都最多只能拥有以下三条中的两条:

数据一致性(C),等同于所有节点访问同一份最新的数据副本; 对数据更新具备高可用性(A); 能容忍网络分区(P)。

于是有了BASE理论。

BASE 理论

eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(StrongConsistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。

(Basically Available)基本可用

在分布式系统出现故障的时候,允许损失部分可用性,即保证核心可用。

(Soft State)软状态

接受一段时间的状态不同步,及中间状态,而改中间状态不影响系统整体可用性。这里的中间状态就是CAP理论中的数据不一致性。

(Eventually Consistent)最终一致性

上面说软状态,然后不可能一直是软状态,必须有个时间期限。在期限过后系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问最终都能够获取到最新的值。

总结

CAP是分布式系统设计理论,BASE是CAP理论中AP方案的延伸,对于C我们采用的方式和策略就是保证最终一致性;

二、什么是分布式系统幂等性?

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。

在编程中.一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“getUsername()和setTrue()”函数就是一个幂等函数.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.

什么是幂等性(Idempotence)?

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request. ——HTTP/1.1规范中幂等性的定义

从定义上看,HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用。说白了就是,同一个请求,发送一次和发送N次效果是一样的!幂等性是分布式系统设计中十分重要的概念,而HTTP的分布式本质也决定了它在HTTP中具有重要地位。下面将以HTTP中的幂等性做例子加以介绍。

简单示例

假设有一个从账户取钱的远程API(可以是HTTP的,也可以不是),我们暂时用类函数的方式记为:

bool withdraw(account_id, amount)

withdraw的语义是从account_id对应的账户中扣除amount数额的钱;如果扣除成功则返回true,账户余额减少amount;如果扣除失败则返回false,账户余额不变。

值得注意的是:和本地环境相比,我们不能轻易假设分布式环境的可靠性

所以问题来了,一种典型的情况是withdraw请求已经被服务器端正确处理,但服务器端的返回结果由于网络等原因被掉丢了,导致客户端无法得知处理结果。如果是在网页上,一些不恰当的设计可能会使用户认为上一次操作失败了,然后刷新页面,这就导致了withdraw被调用两次,账户也被多扣了一次钱。如图所示:

解决方案一:采用分布式事务,通过引入支持分布式事务的中间件来保证withdraw功能的事务性。分布式事务的优点是对于调用者很简单,复杂性都交给了中间件来管理。缺点则是一方面架构太重量级,容易被绑在特定的中间件上,不利于异构系统的集成;另一方面分布式事务虽然能保证事务的ACID性质,而但却无法提供性能和可用性的保证。

解决方案二:幂等设计。我们可以通过一些技巧把withdraw变成幂等的,比如:

int create_ticket() 
bool idempotent_withdraw(ticket_id, account_id, amount)

create_ticket的语义是获取一个服务器端生成的唯一的处理号ticket_id,它将用于标识后续的操作

idempotent_withdrawwithdraw的区别在于关联了一个ticket_id,一个ticket_id表示的操作至多只会被处理一次,每次调用都将返回第一次调用时的处理结果。这样,idempotent_withdraw就符合幂等性了,客户端就可以放心地多次调用。

基于幂等性的解决方案中一个完整的取钱流程被分解成了两个步骤:

1.调用create_ticket()获取ticket_id; 2.调用idempotent_withdraw(ticket_id, account_id, amount)

虽然create_ticket不是幂等的,但在这种设计下,它对系统状态的影响可以忽略,加上idempotent_withdraw是幂等的,所以任何一步由于网络等原因失败或超时,客户端都可以重试,直到获得结果。如图所示:

和分布式事务相比,幂等设计的优势在于它的轻量级,容易适应异构环境,以及性能和可用性方面。在某些性能要求比较高的应用,幂等设计往往是唯一的选择。

HTTP的幂等性

下面以HTTP GET、DELETE、PUT、POST四种方法为主进行语义和幂等性的介绍。

HTTP GET方法用于获取资源,不应有副作用,所以是幂等的。比如:GET http://www.bank.com/account/123456,不会改变资源的状态,不论调用一次还是N次都没有副作用。请注意,这里强调的是一次和N次具有相同的副作用,而不是每次GET的结果相同。GET http://www.news.com/latest-news这个HTTP请求可能会每次得到不同的结果,但它本身并没有产生任何副作用,因而是满足幂等性的。

HTTP DELETE方法用于删除资源,有副作用,但它应该满足幂等性。比如:DELETE http://www.forum.com/article/4231,调用一次和N次对系统产生的副作用是相同的,即删掉id为4231的帖子;因此,调用者可以多次调用或刷新页面而不必担心引起错误。

HTTP POST方法用于创建资源,所对应的URI并非创建的资源本身,而是去执行创建动作的操作者,有副作用,不满足幂等性。比如:POST http://www.forum.com/articles的语义是在http://www.forum.com/articles下创建一篇帖子,HTTP响应中应包含帖子的创建状态以及帖子的URI。两次相同的POST请求会在服务器端创建两份资源,它们具有不同的URI;所以,POST方法不具备幂等性。

HTTP PUT方法用于创建或更新操作,所对应的URI是要创建或更新的资源本身,有副作用,它应该满足幂等性。比如:PUT http://www.forum/articles/4231的语义是创建或更新ID为4231的帖子。对同一URI进行多次PUT的副作用和一次PUT是相同的;因此,PUT方法具有幂等性。

对前文示例进行改进

利用Web API的形式实现前面所提到的取款功能。

1、用POST /tickets来实现create_ticket;

2、用PUT /accounts/account_id/ticket_id&amount=xxx来实现idempotent_withdraw。

值得注意的是严格来讲amount参数不应该作为URI的一部分,真正的URI应该是/accounts/account_id/ticket_id,而amount应该放在请求的body中。这种模式可以应用于很多场合,比如:论坛网站中防止意外的重复发帖。

如何防范 POST 重复提交?

HTTP POST 操作既不是安全的,也不是幂等的(至少在HTTP规范里没有保证)。平时开发的项目中可能会出现下面这些情况:

由于用户误操作,多次点击表单提交按钮。 由于网速等原因造成页面卡顿,用户重复刷新提交页面。 黑客或恶意用户使用postman等工具重复恶意提交表单(攻击网站)。 这些情况都会导致表单重复提交,造成数据重复,增加服务器负载,严重甚至会造成服务器宕机。因此有效防止表单重复提交有一定的必要性。

解决方案

通过JavaScript屏蔽提交按钮(不推荐)

通过js代码,当用户点击提交按钮后,屏蔽提交按钮使用户无法点击提交按钮或点击无效,从而实现防止表单重复提交。

ps:js代码很容易被绕过。比如用户通过刷新页面方式,或使用postman等工具绕过前段页面仍能重复提交表单。因此不推荐此方法。

    <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
        <!DOCTYPE HTML>
        <html>
        <head>
         <title>表单</title>
            <script type="text/javascript">
            //默认提交状态为false
            var commitStatus = false;
            function dosubmit(){
                  if(commitStatus==false){
                //提交表单后,讲提交状态改为true
                  commitStatus = true;
                  return true;
                 }else{
                  return false;
              }
             }
      </script>
     </head>
   
        <body>
            <form action="/path/post" onsubmit="return dosubmit()" method="post">
             用户名:<input type="text" name="username">
            <input type="submit" value="提交" id="submit">
            </form>
        </body>
    </html>

给数据库增加唯一键约束(简单粗暴)

在数据库建表的时候在ID字段添加主键约束,用户名、邮箱、电话等字段加唯一性约束。确保数据库只可以添加一条数据。

数据库加唯一性约束sql:

alter table tableName_xxx add unique key uniq_xxx(field1, field2)

服务器及时捕捉插入数据异常:

        try {
                xxxMapper.insert(user);
            } catch (DuplicateKeyException e) {
                logger.error("user already exist");
            }

通过数据库加唯一键约束能有效避免数据库重复插入相同数据。但无法阻止恶意用户重复提交表单(攻击网站),服务器大量执行sql插入语句,增加服务器和数据库负荷。

利用Session防止表单重复提交(推荐)

实现原理:

服务器返回表单页面时,会先生成一个subToken保存于session,并把该subToen传给表单页面。当表单提交时会带上subToken,服务器拦截器Interceptor会拦截该请求,拦截器判断session保存的subToken和表单提交subToken是否一致。若不一致或session的subToken为空或表单未携带subToken则不通过。

首次提交表单时session的subToken与表单携带的subToken一致走正常流程,然后拦截器内会删除session保存的subToken。当再次提交表单时由于session的subToken为空则不通过。从而实现了防止表单重复提交。

使用:

mvc配置文件加入拦截器配置

<mvc:interceptors>
<mvc:interceptor>
<mvc:mapping path="/**"/>
<bean class="xxx.xxx.interceptor.AvoidDuplicateSubmissionInterceptor"/>
</mvc:interceptor>
</mvc:interceptors>

拦截器

package xxx.xxxx.interceptor;

import xxx.xxx.SubToken;
import org.apache.struts.util.TokenProcessor;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Method;

public class AvoidDuplicateSubmissionInterceptor extends
        HandlerInterceptorAdapter {

    public AvoidDuplicateSubmissionInterceptor() {
    }

    @Override
    public boolean preHandle(HttpServletRequest request,
                             HttpServletResponse response, Object handler) throws Exception {
        if (handler instanceof HandlerMethod) {
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Method method = handlerMethod.getMethod();
            SubToken annotation = method
                    .getAnnotation(SubToken.class);
            if (annotation != null) {
                boolean needSaveSession = annotation.saveToken();
                if (needSaveSession) {
                    request.getSession(false)
                            .setAttribute(
                                    "subToken",
                                    TokenProcessor.getInstance().generateToken(
                                            request));
                }

                boolean needRemoveSession = annotation.removeToken();
                if (needRemoveSession) {
                    if (isRepeatSubmit(request)) {
                        return false;
                    }
                    request.getSession(false).removeAttribute("subToken");
                }
            }
        }
        return true;
    }

    private boolean isRepeatSubmit(HttpServletRequest request) {
        String serverToken = (String) request.getSession(false).getAttribute(
                "subToken");
        if (serverToken == null) {
            return true;
        }
        String clinetToken = request.getParameter("subToken");
        if (clinetToken == null) {
            return true;
        }
        if (!serverToken.equals(clinetToken)) {
            return true;
        }
        return false;
    }
}  

控制层 controller

@RequestMapping("/form")
//开启一个Token
@SubToken(saveToken = true)
public String form() {
  return "/test/form";
}


@RequestMapping(value = "/postForm", method = RequestMethod.POST)
@ResponseBody
//开启Token验证,并且成功之后移除当前Token
@SubToken(removeToken = true)
public String postForm(String userName) {
  System.out.println(System.currentTimeMillis());
  try{
    System.out.println(userName);
    Thread.sleep(1500);//暂停1.5秒后程序继续执行
  }catch (InterruptedException e) {
    e.printStackTrace();
 }
 System.out.println(System.currentTimeMillis());
 return "1";
}

表单页面

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<title>Title</title>
</head>
<body>
<form method="post" action="/postForm">
<input type="text" name="userName">
<input type="hidden" name="subToken" value="${subToken}">
<input type="submit" value="提交">
</form>
</body>
</html>

使用AOP自定义切入实现(分布式锁)

实现原理:

自定义防止重复提交标记(@AvoidRepeatableCommit)。 对需要防止重复提交的Congtroller里的mapping方法加上该注解。 新增Aspect切入点,为@AvoidRepeatableCommit加入切入点。 每次提交表单时,Aspect都会保存当前 key 到 redis(须设置过期时间)。

重复提交时Aspect会判断当前redis是否有该key,若有则拦截。

自定义注解:

        import java.lang.annotation.*;
        
        /**
         * 避免重复提交
         * @author hhz
         * @version
         * @since
         */
        @Target(ElementType.METHOD)
        @Retention(RetentionPolicy.RUNTIME)
        public @interface AvoidRepeatableCommit {
        
            /**
             * 指定时间内不可重复提交,单位毫秒
             * @return
             */
            long timeout()  default 30000 ;
        
        }
自定义切入点Aspect

        /**
         * 重复提交aop
         * @author hhz
         * @version 
         * @since 
         */
        @Aspect
        @Component
        public class AvoidRepeatableCommitAspect {
        
            @Autowired
            private RedisTemplate redisTemplate;
        
            /**
             * @param point
             */
            @Around("@annotation(com.xwolf.boot.annotation.AvoidRepeatableCommit)")
            public Object around(ProceedingJoinPoint point) throws Throwable {
        
                HttpServletRequest request  = ((ServletRequestAttributes)RequestContextHolder.currentRequestAttributes()).getRequest();
                String ip = IPUtil.getIP(request);
                //获取注解
                MethodSignature signature = (MethodSignature) point.getSignature();
                Method method = signature.getMethod();
                //目标类、方法
                String className = method.getDeclaringClass().getName();
                String name = method.getName();
                String ipKey = String.format("%s#%s",className,name);
                int hashCode = Math.abs(ipKey.hashCode());
                String key = String.format("%s_%d",ip,hashCode);
                log.info("ipKey={},hashCode={},key={}",ipKey,hashCode,key);
                AvoidRepeatableCommit avoidRepeatableCommit =  method.getAnnotation(AvoidRepeatableCommit.class);
                long timeout = avoidRepeatableCommit.timeout();
                if (timeout < 0){
                                //过期时间5分钟
                    timeout = 60*5;
                }
                String value = (String) redisTemplate.opsForValue().get(key);
                if (StringUtils.isNotBlank(value)){
                    return "请勿重复提交";
                }
                redisTemplate.opsForValue().set(key, UUIDUtil.uuid(),timeout,TimeUnit.MILLISECONDS);
                //执行方法
                Object object = point.proceed();
                return object;
            }
        
        }

三、关于分布式锁

什么是分布式锁?

什么是锁?

在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。 而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。这个标记可以理解为锁。 不同地方实现锁的方式也不一样,只要能满足所有线程都能看得到标记即可。如 Java 中 synchronize 是在对象头设置标记,Lock 接口的实现类基本上都只是某一个 volitile 修饰的 int 型变量其保证每个线程都能拥有对该 int 的可见性和原子修改,linux 内核中也是利用互斥量或信号量等内存数据做标记。 除了利用内存数据做锁其实任何互斥的都能做锁(只考虑互斥情况),如流水表中流水号与时间结合做幂等校验可以看作是一个不会释放的锁,或者使用某个文件是否存在作为锁等。只需要满足在对标记进行修改能保证原子性和内存可见性即可。

什么是分布式?

分布式的 CAP 理论告诉我们:

任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。

目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。

分布式场景

此处主要指集群模式下,多个相同服务同时开启.

在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,但是在分布式环境下,就没有那么简单啦。

分布式与单机情况下最大的不同在于其不是多线程而是多进程。 多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。

什么是分布式锁?

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(我觉得分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠。。。一个大坑) 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。

分布式锁应该具备的条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行; 2、高可用的获取锁与释放锁; 3、高性能的获取锁与释放锁; 4、具备可重入特性; 5、具备锁失效机制,防止死锁; 6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现方式:

基于数据库实现分布式锁; 基于缓存(Redis等)实现分布式锁; 基于Zookeeper实现分布式锁;

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

Memcached:利用 Memcached 的 add 命令。此命令是原子性操作,只有在 key 不存在的情况下,才能 add 成功,也就意味着线程得到了锁。 Redis:和 Memcached 的方式类似,利用 Redis 的 setnx 命令。此命令同样是原子性操作,只有在 key 不存在的情况下,才能 set 成功。 Zookeeper:利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。 Chubby:Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。

基于数据库实现分布式锁

基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

(1)创建一个表:

DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
  `desc` varchar(255) NOT NULL COMMENT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

(2)想要执行某个方法,就使用这个方法名向表中插入数据:

INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:

delete from method_lock where method_name ='methodName';

注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!

使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:

1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;

2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;

3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;

4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

基于Redis实现分布式锁

选用Redis实现分布式锁原因 Redis有很高的性能 Redis命令对此支持较好,实现起来比较方便 在此就不介绍Redis的安装了,具体在Linux和Windows中的安装可以查看我前面的博客。 http://www.cnblogs.com/liuyang0/p/6504826.html

使用命令介绍 SETNX SETNX key val 当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

expire expire key timeout 为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

delete delete key 删除key

在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

实现 使用的是jedis来连接Redis。

实现思想 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。 分布式锁的核心代码如下:

package redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.util.List;
import java.util.UUID;


public class DistributedLock {
    private final JedisPool jedisPool;

    public DistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     * @param locaName  锁的key
     * @param acquireTimeout  获取超时时间
     * @param timeout   锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String locaName,
                                  long acquireTimeout, long timeout) {
        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即key值
            String lockKey = "lock:" + locaName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int)(timeout / 1000);

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     * @param lockName 锁的key
     * @param identifier    释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            conn = jedisPool.getResource();
            while (true) {
                // 监视lock,准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}

基于ZooKeeper实现分布式锁

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

ZooKeeper的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机。ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。

基于ZooKeeper分布式锁的流程 在zookeeper指定节点(locks)下创建临时顺序节点node_n 获取locks下所有子节点children 对子节点按节点自增序号从小到大排序 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件 若监听事件生效,则回到第二步重新进行判断,直到获取到锁 具体实现 下面就具体使用java和zookeeper实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。

通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。 以下整体流程基本与上述描述流程一致,只是在监听的时候使用的是CountDownLatch来监听前一个节点。

zookeeper 分布式锁源代码:

package zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;


public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk = null;
    // 根节点
    private String ROOT_LOCK = "/locks";
    // 竞争的资源
    private String lockName;
    // 等待的前一个锁
    private String WAIT_LOCK;
    // 当前锁
    private String CURRENT_LOCK;
    // 计数器
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 30000;
    private List<Exception> exceptionList = new ArrayList<Exception>();

    /**
     * 配置分布式锁
     * @param config 连接的url
     * @param lockName 竞争资源
     */
    public DistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            // 连接zookeeper
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(ROOT_LOCK, false);
            if (stat == null) {
                // 如果根节点不存在,则创建根节点
                zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 节点监视器
    public void process(WatchedEvent event) {
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
    }

    public void lock() {
        if (exceptionList.size() > 0) {
            throw new LockException(exceptionList.get(0));
        }
        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
                return;
            } else {
                // 等待锁
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new LockException("锁名有误");
            }
            // 创建临时有序节点
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " 已经创建");
            // 取所有子节点
            List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
            // 取出所有lockName的锁
            List<String> lockObjects = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) {
                    lockObjects.add(node);
                }
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
            // 若当前节点为最小节点,则获取锁成功
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }

            // 若不是最小节点,则找到自己的前一个节点
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // 等待锁
    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
            this.countDownLatch = new CountDownLatch(1);
            // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " 等到了锁");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("释放锁 " + CURRENT_LOCK);
            zk.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }


    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

四、分布式事务与异步消息处理

一、分布式事务前奏

  • 事务:事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。
  • 本地事务:当事务由资源管理器本地管理时被称作本地事务。本地事务的优点就是支持严格的ACID特性,高效,可靠,状态可以只在资源管理器中维护,而且应用编程模型简单。但是本地事务不具备分布式事务的处理能力,隔离的最小单位受限于资源管理器。
  • 全局事务:当事务由全局事务管理器进行全局管理时成为全局事务,事务管理器负责管理全局的事务状态和参与的资源,协同资源的一致提交回滚。
  • TX协议:应用或者应用服务器与事务管理器的接口。
  • XA协议:全局事务管理器与资源管理器的接口。XA是由X/Open组织提出的分布式事务规范。该规范主要定义了全局事务管理器和局部资源管理器之间的接口。主流的数据库产品都实现了XA接口。XA接口是一个双向的系统接口,在事务管理器以及多个资源管理器之间作为通信桥梁。之所以需要XA是因为在分布式系统中从理论上讲两台机器是无法达到一致性状态的,因此引入一个单点进行协调。由全局事务管理器管理和协调的事务可以跨越多个资源和进程。全局事务管理器一般使用XA二阶段协议与数据库进行交互。
  • AP:应用程序,可以理解为使用DTP(Data Tools Platform)的程序。
  • RM:资源管理器,这里可以是一个DBMS或者消息服务器管理系统,应用程序通过资源管理器对资源进行控制,资源必须实现XA定义的接口。资源管理器负责控制和管理实际的资源。
  • TM:事务管理器,负责协调和管理事务,提供给AP编程接口以及管理资源管理器。事务管理器控制着全局事务,管理事务的生命周期,并且协调资源。
  • 两阶段提交协议:XA用于在全局事务中协调多个资源的机制。TM和RM之间采取两阶段提交的方案来解决一致性问题。两节点提交需要一个协调者(TM)来掌控所有参与者(RM)节点的操作结果并且指引这些节点是否需要最终提交。两阶段提交的局限在于协议成本,准备阶段的持久成本,全局事务状态的持久成本,潜在故障点多带来的脆弱性,准备后,提交前的故障引发一系列隔离与恢复难题。
  • BASE理论:BA指的是基本业务可用性,支持分区失败,S表示柔性状态,也就是允许短时间内不同步,E表示最终一致性,数据最终是一致的,但是实时是不一致的。原子性和持久性必须从根本上保障,为了可用性、性能和服务降级的需要,只有降低一致性和隔离性的要求。
  • CAP定理:对于共享数据系统,最多只能同时拥有CAP其中的两个,任意两个都有其适应的场景,真是的业务系统中通常是ACID与CAP的混合体。分布式系统中最重要的是满足业务需求,而不是追求高度抽象,绝对的系统特性。C表示一致性,也就是所有用户看到的数据是一样的。A表示可用性,是指总能找到一个可用的数据副本。P表示分区容错性,能够容忍网络中断等故障。
  • 柔性事务中的服务模式:
    1. 可查询操作:服务操作具有全局唯一的标识,操作唯一的确定的时间。
    2. 幂等操作:重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性,二是系统缓存所有请求与处理的结果,最后是检测到重复请求之后,自动返回之前的处理结果。
    3. TCC操作:Try阶段,尝试执行业务,完成所有业务的检查,实现一致性;预留必须的业务资源,实现准隔离性。Confirm阶段:真正的去执行业务,不做任何检查,仅适用Try阶段预留的业务资源,Confirm操作还要满足幂等性。Cancel阶段:取消执行业务,释放Try阶段预留的业务资源,Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别:TCC位于业务服务层而不是资源层,TCC没有单独准备阶段,Try操作兼备资源操作与准备的能力,TCC中Try操作可以灵活的选择业务资源,锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作,但是TCC不等同于2PC操作。
    4. 可补偿操作:Do阶段:真正的执行业务处理,业务处理结果外部可见。Compensate阶段:抵消或者部分撤销正向业务操作的业务结果,补偿操作满足幂等性。约束:补偿操作在业务上可行,由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上,TCC的Confirm和Cancel操作可以看做是补偿操作。

二、柔性事务解决方案架构

在电商领域等互联网场景下,传统的事务在数据库性能和处理能力上都暴露出了瓶颈。柔性事务有两个特性:基本可用和柔性状态。所谓基本可用是指分布式系统出现故障的时候允许损失一部分的可用性。柔性状态是指允许系统存在中间状态,这个中间状态不会影响系统整体的可用性,比如数据库读写分离的主从同步延迟等。柔性事务的一致性指的是最终一致性。

(一)、基于可靠消息的最终一致性方案概述

  • 实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
  • 消息:业务处理服务在业务事务回滚后,向实时消息服务取消发送。消息发送状态确认系统定期找到未确认发送或者回滚发送的消息,向业务处理服务询问消息状态,业务处理服务根据消息ID或者消息内容确认该消息是否有效。被动方的处理结果不会影响主动方的处理结果,被动方的消息处理操作是幂等操作。
  • 成本:可靠的消息系统建设成本,一次消息发送需要两次请求,业务处理服务需要实现消息状态回查接口。
  • 优点:消息数据独立存储,独立伸缩,降低业务系统和消息系统之间的耦合。对最终一致性时间敏感度较高,降低业务被动方的实现成本。兼容所有实现JMS标准的MQ中间件,确保业务数据可靠的前提下,实现业务的最终一致性,理想状态下是准实时的一致性。

(二)、TCC事务补偿型方案

  • 实现:一个完整的业务活动由一个主业务服务于若干的从业务服务组成。主业务服务负责发起并完成整个业务活动。从业务服务提供TCC型业务操作。业务活动管理器控制业务活动的一致性,它登记业务活动的操作,并在业务活动提交时确认所有的TCC型操作的Confirm操作,在业务活动取消时调用所有TCC型操作的Cancel操作。
  • 成本:实现TCC操作的成本较高,业务活动结束的时候Confirm和Cancel操作的执行成本。业务活动的日志成本。
  • 使用范围:强隔离性,严格一致性要求的业务活动。适用于执行时间较短的业务,比如处理账户或者收费等等。
  • 特点:不与具体的服务框架耦合,位于业务服务层,而不是资源层,可以灵活的选择业务资源的锁定粒度。TCC里对每个服务资源操作的是本地事务,数据被锁住的时间短,可扩展性好,可以说是为独立部署的SOA服务而设计的。

(三)、最大努力通知型

  • 实现:业务活动的主动方在完成处理之后向业务活动的被动方发送消息,允许消息丢失。业务活动的被动方根据定时策略,向业务活动的主动方查询,恢复丢失的业务消息。
  • 约束:被动方的处理结果不影响主动方的处理结果。
  • 成本:业务查询与校对系统的建设成本。
  • 使用范围:对业务最终一致性的时间敏感度低。跨企业的业务活动。
  • 特点:业务活动的主动方在完成业务处理之后,向业务活动的被动方发送通知消息。主动方可以设置时间阶梯通知规则,在通知失败后按规则重复通知,知道通知N次后不再通知。主动方提供校对查询接口给被动方按需校对查询,用户恢复丢失的业务消息。
  • 适用范围:银行通知,商户通知。

三、基于可靠消息的最终一致性方案详解

(一)、消息发送一致性

消息中间件在分布式系统中的核心作用就是异步通讯、应用解耦和并发缓冲(也叫作流量削峰)。在分布式环境下,需要通过网络进行通讯,就引入了数据传输的不确定性,也就是CAP理论中的分区容错性。

消息发送一致性是指产生消息的业务动作与消息发送一致,也就是说如果业务操作成功,那么由这个业务操作所产生的消息一定要发送出去,否则就丢失。

处理方式一

public void completeOrderService() {
    // 处理订单
    order.process();

    // 发送会计原始凭证消息
    pipe.sendAccountingVouchetMessage();
}

在上面的情况中,如果业务操作成功,执行的消息发送之前应用发生故障,消息发送不出去,导致消息丢失,将会产生订单系统与会计系统的数据不一致。如果消息系统或者网络异常,也会导致消息发送不出去,也会造成数据不一致。

处理方式二

public void completeOrderService() {
    // 发送会计原始凭证消息
    pipe.sendAccountingVouchetMessage();

    // 处理订单
    order.process();
}

如果将上面的两个操作调换一下顺序,这种情况就会更加不可控了,消息发出去了业务订单可能会失败,会造成订单系统与业务系统的数据不一致。那么JMS标准中的XA协议是否可以保障发送的一致性?

  • JMS协议标准的API中,有很多以XA开头的接口,其实就是前面讲到的支持XA协议(基于两阶段提交协议)的全局事务型接口。 XAConnection.class XAConnectionFactory.class XAQueueConnection.class XAQueueConnectionFactory.class XASession.class XATopicConnection.class XATopicConnectionFactory.class XATopicSession.class
  • JMS中的XA系列的接口可以提供分布式事务的支持。但是引用XA方式的分布式事务,就会带来很多局限性。
    1. 要求业务操作的资源必须支持XA协议,但是并不是所有的资源都支持XA协议。
    2. 两阶段提交协议的成本。
    3. 持久化成本等DTP模型的局限性,例如:全局锁定、成本高、性能低。
    4. 使用XA协议违背了柔性事务的初衷。

(二)、保证消息一致的变通做法

  1. 发送消息:主动方现将应用把消息发给消息中间件,消息状态标记为“待确认”状态。
  2. 消息中间件收到消息后,把消息持久化到消息存储中,但是并不影响被动方投递消息。
  3. 消息中间件返回消息持久化结果,主动方根据返回的结果进行判断如何进行业务操作处理:
    1. 失败:放弃执行业务操作处理,结束,必要时向上层返回处理结果。
    2. 成功:执行业务操作处理。
  4. 业务操作完成后,把业务操作结果返回给消息中间件。
  5. 消息中间件收到业务操作结构后,根据业务结果进行处理:
    1. 失败:删除消息存储中的消息,结束。
    2. 成功:更新消息存储中的消息状态为“待发送”,然后执行消息投递。
  6. 前面的正向流程都成功之后,向被动方应用投递消息。

但是在上面的处理流程中,任何一个环节都有可能出现问题。

(三)、常规MQ消息处理流程和特点

  • 常规的MQ队列处理流程无法实现消息的一致性。
  • 投递消息的本质就是消息消费,可以细化。

(四)、消息重复发送问题和业务接口幂等性设计

image

对于未确认的消息,采用按规则重新投递的方式进行处理。对于以上流程,消息重复发送会导致业务处理接口出现重复调用的问题。消息消费过程中消息重复发送的主要原因就是消费者成功接收处理完消息后,消息中间件没有及时更新投递状态导致的。如果允许消息重复发送,那么消费方应该实现业务接口的幂等性设计。

(五)、本地消息服务方案

  • 实现思路:
    1. 主动方应用系统通过业务操作完成业务数据的操作,在准备发送消息的时候将消息存储在主动方应用系统一份,另一份发送到实时消息服务
    2. 被动方应用系统监听实时消息系统中的消息,当被动方完成消息处理后通过调用主动方接口完成消息确认
    3. 主动方接收到消息确认以后删除消息数据。
    4. 通过消息查询服务查询到消息被接收之后再规定的时间内没有返回ACK确认消息就通过消息恢复系统重新发送消息。
  • 优点:
    1. 消息的时效性比较高
    2. 从应用设计的角度实现了消息数据的可靠性,消息数据的可靠性不依赖于MQ中间件,弱化了对MQ中间件特性的依赖。
    3. 方案轻量级,容易实现。
  • 缺点:
    1. 与具体的业务场景绑定,耦合性强,不可以共用。
    2. 消息数据与业务数据同步,占用业务系统资源。
    3. 业务系统在使用关系型数据库的情况下消息服务性能会受到关系型数据库的并发性能限制。

(六)、独立消息服务方案

  • 实现思路:
    1. 预发送消息:主动方应用系统预发送消息,由消息服务子系统存储消息,如果存储失败,那么也就无法进行业务操作。如果返回存储成功,然后执行业务操作。
    2. 执行业务操作:执行业务操作如果成功的时候,将业务操作执行成功的状态发送到消息服务子系统。消息服务子系统修改消息的标识为“可发送”状态。
    3. 发送消息到实时消息服务:当消息的状态发生改变的时候,立刻将消息发送到实时消息服务中。接下来,消息将会被消息业务的消费端监听到,然后被消费。
    4. 消息状态子系统:相当于定时任务系统,在消息服务子系统中定时查找确认超时的消息,在主动方应用系统中也去定时查找没有处理成功的任务,进行相应的处理。
    5. 消息消费:当消息被消费的时候,向实时消息服务发送ACK,然后实时消息服务删除消息。同时调用消息服务子系统修改消息为“被消费”状态。
    6. 消息恢复子系统:当消费方返回消息的时候,由于网络中断等其他原因导致消息没有及时确认,那么需要消息恢复子系统定时查找出在消息服务子系统中没有确认的消息。将没有被确认的消息放到实时消息服务中,进行重做,因为被动方应用系统的接口是幂等的。
  • 优点:
    1. 消息服务独立部署,独立维护,独立伸缩。
    2. 消息存储可以按需选择不同的数据库来集成实现。
    3. 消息服务可以被相同的的使用场景使用,降低重复建设服务的成本。
    4. 从分布式服务应用设计开发角度实现了消息数据的可靠性,消息数据的可靠性不依赖于MQ中间件,弱化了对MQ中间件特性的依赖。
    5. 降低了业务系统与消息系统之间的耦合,有利于系统的扩展维护。
  • 缺点:
    1. 一次消息发送需要两次请求。
    2. 主动方应用系统需要实现业务操作状态的校验与查询接口。

(七)、消息服务子系统的设计实现

示例消息数据表:

名称

数据类型

允许空

默认值

属性

释义

uuid

varchar(50)

No

unique

UUID

version

int(11)

No

0

版本号

editer

varchar(100)

Yes

NULL

修改者

creater

varchar(100)

Yes

NULL

创建者

edit_time

datetime

Yes

0000-00-00 00:00:00

最后修改时间

create_time

datetime

No

0000-00-00 00:00:00

创建时间

msg_id

varchar(50)

No

消息ID

msg_body

longtext

No

消息内容

msg_date_type

varchar(50)

Yes

消息数据类型

consumer_queue

varchar(100)

No

消费队列

send_times

int(6)

No

0

消息重发次数

is_dead

varchar(20)

No

是否死亡

status

varchar(20)

No

状态

remark

varchar(200)

Yes

备注

field0

varchar(200)

Yes

扩展字段0

field1

varchar(200)

Yes

扩展字段1

field2

varchar(200)

Yes

扩展字段2

参考资料

https://blog.csdn.net/wuzhiwei549/article/details/80692278 https://www.i3geek.com/archives/841 https://www.cnblogs.com/seesun2012/p/9214653.html https://github.com/yangliu0/DistributedLock https://www.cnblogs.com/liuyang0/p/6744076.html https://www.cnblogs.com/liuyang0/p/6800538.html https://mwhittaker.github.io/blog/an_illustrated_proof_of_the_cap_theorem/ https://www.infoq.cn/article/cap-twelve-years-later-how-the-rules-have-changed https://www.cnblogs.com/bluemiaomiao/p/11216380.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 访问者模式,递归遍历树节点

    国内第一Kotlin 开发者社区公众号,主要分享、交流 Kotlin 编程语言、Spring Boot、Android、React.js/Node.js、函数式...

    一个会写诗的程序员
  • 《Kotlin 程序设计》第九章 Kotlin与Java混合调用

    虽然 Kotlin 的开发很方便,但当你与他人协作时,总会碰到 Java 与 Kotlin 代码共存的代码项目。本章就教你如何优雅的实现 Kotlin 与 Ja...

    一个会写诗的程序员
  • 《Kotin 极简教程》第12章 使用 Kotlin 集成Gradle 开发 第12章 使用 Kotlin 集成Gradle 开发《Kotlin极简教程》正式上架:

    由于 Kotlin 具有丰富的功能,如一等函数和扩展方法等,因此它可以保留和改进 Gradle 构建脚本的最佳部分——包括简明的声明式语法以及轻松制作 DSL ...

    一个会写诗的程序员
  • 常用插件备份

    阿炬
  • 三版Google Atlas机器人的进化史,它是如何走上逆天之路的

    2016年2月23日,谷歌在YouTube公布了旗下波士顿动力(Boston Dynamics)研发的新一代人性机器人:ATLAS。ATLAS的出现引起了业内的...

    机器人网
  • 被机器狗挑逗一番后,波士顿动力的明星机器人Atlas倒在谢幕之时

    问耕 发自 凹非寺 量子位 报道 | 公众号 QbitAI Atlas侬晓得伐? 这个英文单词背后有各种不同的指代,这里我们指的是波士顿动力(Boston Dy...

    量子位
  • 最新的微信域名屏蔽检测API接口分享(含PHP、JAVA、Python等文档调用代码)

    把上面的微信域名检测api接口的"http://www.baidu.com"替换成需要检测的链接即可使用!

    南昌谢大脚
  • Airtest Project入门

    Airtest Project是网易出品的一款自动化解决方案,它适用于任意游戏引擎和应用的自动化测试,支持Android和Windows。它不需要依赖被测对象的...

    测试邦
  • RabbitMQ扩展之消费者消息预读取

    AMQP 0-9-1协议中定义了basic.qos方法用于限制信道或者连接上的未确认消息数量,这个消息数据量命名为prefetch_count。不幸的是,信道其...

    Throwable
  • 如何利用云安全运营中心监测数据泄露

    ➢5月,三星手机厂商多个内部项目代码泄露,包括SmartThings敏感的源代码、证书和密钥。

    王录华

扫码关注云+社区

领取腾讯云代金券