前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >System|分布式|从Dubbo+Zookeeper开始实现分布式存储系统

System|分布式|从Dubbo+Zookeeper开始实现分布式存储系统

作者头像
朝闻君
发布2021-11-22 10:24:52
3350
发布2021-11-22 10:24:52
举报
文章被收录于专栏:用户9199536的专栏

源码已开源。分布式键值存储是很常见的中间件,例如Redis。专业的收口课要求我们以实现一个分布式键值对存储系统,并且除了Zookeeper和RPC框架之外不允许使用其他分布式中间件。

为什么不直接用zookeeper来做键值对存储?

zookeepeer的协议zab是Paxos变体,众所周知,Paxos是一个强一致性协议,需要经过多轮提议才能确定最终的共识。如果使用Zookeeper来进行存储,性能会惨不忍睹。

因此,现在的分布式架构多以Zookeeper作为注册中心存储metadata,涉及性能的data自己处理。在这里,我用Zookeeper+Dubbo RPC框架作为基础平台。

需求规约

基本架构

我们的分布式存储系统具有三个原语,实现最简单的API

  • READ(key)
  • PUT(key,value)
  • DELETE(key,value)

客户端对于Master请求应该根据key转发给对应的数据节点,类似于分库分表。这里的难点在于,如何实现scalability?


Zookeepr集群搭建

基本功能实现(单Client+单Master+单Data)

zookeeper就默认安装,改个zoo.cfg,维持2181端口即可。

首先建立一个Maven项目,在其中添加api模块,其中放置我们RPC的接口文件,需要注意,这里的参数和返回值不能是基类,否则在后面的RPC marshal/unmarshal时似乎会出现问题(例如不是Map而是ConcurrentHashMap)。

然后创建三个spring模块client、master、slave(这里的Slave类指的就是Data节点),pom如下。这里的API模块是为了支持依赖注入,服务通过注入的API完成RPC通信。

代码语言:javascript
复制
        <dependency>
            <groupId>sjtu</groupId>
            <artifactId>api</artifactId>
            <version>0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>${dubbo.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-dependencies-zookeeper</artifactId>
            <version>${dubbo.version}</version>
            <type>pom</type>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

property设置为

代码语言:javascript
复制
spring.application.name= dubbo-master
#依赖注入的service在哪个包扫描,这里包名是sjtu.master,其他类比即可
dubboo.scan.base-packages=sjtu.master
# Dubbo Protocol
dubbo.protocol.name=dubbo
## Random port
dubbo.protocol.port=-1
## Dubbo Registry
dubbo.registry.address=zookeeper://127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
dubbo.registry.file = ${user.home}/dubbo-cache/${spring.application.name}/dubbo.cache
## DemoService version
demo.service.version=1.0.0

Application的注解为@EnableAutoConfiguration, 服务类的实现为(SlaveService的方法可以用ConcurrentHashMap简易实现)

代码语言:javascript
复制
@DubboService(version = "${demo.service.version}")
public class SlaveService implements SlaveAPI {
    @Override
    public Boolean PUT(String key, String val) {}
    @Override
    public String READ(String key) {}
    @Override
    public Boolean DELETE(String key) {}
}
@DubboService(version =  "${demo.service.version}")
public class MasterService implements MasterAPI {
    @DubboReference(version = "${demo.service.version}")
    private SlaveAPI slave;
    @Override
    public Boolean PUT(String key, String val){
        Boolean ret =  slave.PUT(key,val);
        return ret;
    }
    @Override
    public String READ(String key){
        String ret =   slave.READ(key);
        return ret;
    }
    @Override
    public Boolean DELETE(String key){
        Boolean ret =  slave.DELETE(key);
        return ret;
    }
}
@EnableAutoConfiguration
public class ClientApplication {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @DubboReference(version = "${demo.service.version}")
    private MasterAPI master;
    public static void main(String[] args) {
        SpringApplication.run(ClientApplication.class, args);
    }
    @Bean
    public ApplicationRunner runner() {
        return args -> {
            logger.info(master.sayHello("mercyblitz"));
            for(Integer i = 0;i<10;i++){
                logger.info("PUT "+ i + " with "+ master.PUT(i.toString(),i.toString()).toString());
            }
        };
    }
}

通过@DubboReference和@DubboService,Dubbo框架帮助我们从Zookeeper进行服务发现,在执行API对象时,我们实际在对注册的节点进行RPC操作。

以上,一个简单的RPC键值对存储系统就实现了,Master会进行RPC从实际存放数据的Slave节点中获取数据并返回。


负载均衡(单Client+单Master+多Data)

问题在于,我们需要实现均衡,让不同的key请求不同的Data节点, 如果注册多个同版本Data实例,在RPC通信时转发给不同的实例,是否就实现了呢?

在没有设置负载均衡时,默认访问第一个实例,而我们希望我们的访问能够根据key访问对应的实例,所以,这里我们需要进行负载均衡拓展,实现我们自定义的转发策略。需要实现它的doSelect接口,从一组服务列表(List<Invoker>)中选择转发的服务。

代码语言:javascript
复制
@DubboReference(version = "${demo.service.version}", loadbalance = "TOWLoadBalance")
    private SlaveAPI slave;

新增模块loadbalance,这里我们利用Dubbo开放的SPI拓展,在如下文件中写入

负载均衡SPI拓展

代码语言:javascript
复制
TOWLoadBalance=sjtu.loadbalance.TOWLoadBalance

Dubbo将会将我们实现的TOWLoadBalance类注入slave中。

一致性Hash

我们希望在节点数目增加后,请求能最大概率地还访问原本的节点,这就涉及到经典的一致性hash算法了。

利用隐式参数传递在RPC上下文中存储调用的key,送给负载均衡算法供计算。

代码语言:javascript
复制
     @Override
    public Boolean PUT(String key, String val){
        RpcCon @ text.getContext().setAttachment("key", key);
        Boolean ret =  slave.PUT(key,val);
        return ret;
    }

Dubbo已经实现了一致性哈希负载均衡,不过作为作业肯定不能用轮子(还有别的原因)。这里我们用实际节点的地址添加后缀,作为虚拟节点,并计算虚拟节点的hash,然后找到第一个hash大于key的虚拟节点(or环形),在找到虚拟节点后去除后缀就能获得对应的实际节点。

我的实现暂时无法公开,思路和下面的博客类似。

数据迁移

问题在于,一致性hash只能使得请求划分到对应的节点上,然而我们的数据可没法凭空过去。新增的节点没有数据,根本没法处理转移的GET操作。

怎么办?

正常的思路是,加入新的节点后,这个时候的数据划分已经不具备正确性了。我们停机吧。在Data之间迁移数据,等数据被正确划分之后再提供服务。

问题是,停机的时候进行数据迁移,那么随着节点数目的增加,停机的时间会越来越长,既不具备scalability,也不具备availability。

Transfer on Write!

这个算法是我水群的时候灵光一闪想出来的,估计很多实现细节没有考虑,专门用于这种数据分库分表的场景,所以没有什么泛用性。

思路和操作系统的Copy on Write一脉相承,COW指的是,当复制某个对象时,我们没有必要真的复制,读的时候我们完全可以读原本的对象,只有写的时候才需要进行复制。

同理,在迁移数据时,我们没有必要真的迁移,读的时候我们完全可以从之前的节点读取,只有写的时候才需要迁移到新的节点。这本质上是惰性思想。

我们维护一张路由表,存储key->Address的历史条目。

  • Read时,从路由表获取节点,沿用之前的节点。(记忆)
  • Write时,转发给一致性hash计算的节点,并且更新路由表。(迁移)
代码语言:javascript
复制
      if(method.equals("READ")){
            //有数据,沿用之前的服务器
            if(routeMap.containsKey(key))
                return findInvoker(invokers, routeMap.get(key));
                //无数据,直接使用Hash
            else {
                routeMap.put(key,invoke.getUrl().getAddress());
                return invoke;
            }
        }
        else if(method.equals("PUT")||method.equals("DELETE")){
            //有数据
            if(routeMap.containsKey(key)) {
                //节点增加,更新至新的服务器,这个时候delete是个假的delete,但是新服务器没数据所以等效,但是返回false
                if (!routeMap.get(key).equals( expectAddr))
                    routeMap.put(key,  expectAddr);
                return invoke;
            }
            //无数据,直接使用hash
            else {
                routeMap.put(key,   expectAddr);
                return invoke;
            }
        }

这样有什么好处呢?

  1. Cache - GET时直接使用路由表的节点,而不需要通过一致性hash计算,免去了一致性hash的开销,相当于一个cache。
  2. Lazy - 试想一下这种情况,某个数据在新增节点时进行了转移,结果还没有被读取,数据就被更新或者删除了,那么这个转移还有必要么?按照我们的算法,这种情况根本就不需要进行数据转移就可以直接PUT/DELETE!TOW用惰性的方式有效识别了这种额外开销并予以避免。
  3. Availability:均摊开销,我们只需要每次负载均衡时维护路由表即可,对于用户来说,增加节点是无感知的,数据迁移也通过惰性的方式在写的时候进行,并且没有带来额外开销。相比于停机而言,我们的分布式系统增加Data节点几乎毫无代价,可以平稳连续进行

当然也有缺陷

  1. 弱负载均衡性 - 初始时,原先节点的GET负载没有变化,随着写操作的增加,数据才逐渐转移到新增的节点中,最终达到负载均衡。
  2. 有状态 - Master需要维护这样的路由表记忆,而如果采取停机的方式,Master可以根据一致性Hash实时演算,不需要状态

总而言之,TOW负载均衡算法能够正确地转发请求,并且最终实现正确的数据切分。

负载均衡+数据切分


支持并发之分布式锁(多Client+单Master+多Data)

为了处理高并发的情况,我们需要避免多个写请求同时进入临界区,那么简单粗暴的分布式锁肯定是最佳选择。在这个系统中,最合理的分布式锁显然是读写锁。

zookeeper的临时顺序节点能有效帮助。(吐槽一句,Dubbo你实现的ZkClient里面怎么只有临时节点和永久节点,害得我专门copy源码加了个临时顺序节点的API)

顺序节点可以看成线性表,隐性的FIFO队列,因此我们可以套用读写锁的思路。

  • 读者等待写者离开临界区
  • 写者等待读写者离开临界区

这里就表现为: 读锁监听最近一个写锁节点的销毁,写锁监听上一个读写锁节点的销毁。同样为了避免泄露代码,放出参考思路:

在Master执行READ时,拿key粒度的读锁,在Master执行PUT/DELETE时拿写锁,从而避免了Race Condition的发生。

(这个实现果然很经不起推敲,崩溃的时候容易出现各种奇奇怪怪的东西,不过要高鲁棒性的算法那肯定不止这么几行代码)

代码语言:javascript
复制
        Lock lock = new Lock("/rwlock"+key);
        lock.lockWrite();
        RpcContext.getContext().setAttachment("key", key);
        Boolean ret =  slave.PUT(key,val);
        lock.unLockWrite();

数据备份(多Client + 单Master + 多Data Replicas)

我们还需要对Data节点进行多重备份,以便使得Data节点崩溃时存储的K-V数据不会丢失。维护备份数据一致性的方法有很多,在这里我使用基本的2PC协议进行实现。

2PC协议需要引入一个不存储数据的Coordinator节点,我使用Slave节点作为2PC的Coordinator,Data节点作为Participant。这里的Slave和上面出现区别,本身不存储数据,由DataAPI提供数据。

部署时,在property文件中配置group.id,然后在注解Service和Reference的时候增加group属性(这样做其实是不符合Reference规范的,他的意思是同一个接口的不同实现,这里单纯当分组用),group相同的节点视作处于同一2PC协议下。

这里需要注意的是,不能给Slave @DubboService加上group,因为根据Dubbo的规定

2.2.0 以上版本支持,总是只调一个可用组的实现

对于Master调Slave而言,我们需要发现所有的Slave节点;而对于Slave调Data才只需要关注少数。所以Data节点能加上group而Slave不行。

代码语言:javascript
复制
@DubboService(version = "${demo.service.version}",group = "${group.id}")
DataService
@DubboReference(version = "${demo.service.version}",group = "${group.id}")
DataService

@DubboReference(version = "${demo.service.version}", loadbalance = "TOWLoadBalance")
    private SlaveAPI slave;

典型2PC协议

达成2PC协议,要求对三方进行修改,按照上面TOWLoadBalance的方式增加一个自定义LoadBalance以支持同时调用所有引用的服务。(这里是很Naive的实现,非礼勿视)

Slave - 先发出写请求,均返回yes,再发出COMMIT请求

代码语言:javascript
复制
lock.lock();
if(data.DELETE(key))
    data.COMMIT();
lock.unlock();

LoadBalance - 写请求应当能够转发给所有Backup(下面这个代码只能实现转发,如果对方崩溃了不会导致整个函数崩溃,事实上没法做到ABORT,emmmm)

代码语言:javascript
复制
if (invokers.size() > 1) {
     for (int i = 1; i < invokers.size(); i++) {
       invokers.get(i).invoke(invocation);
      }
    }
return invokers.get(0);

Data - 支持事务回退,All-or-nothing

m是数据表,cas指的是Copy-And-Swap,是C++保证异常安全的手段,意思是修改时先在副本上进行修改,修改完成后再进行原子性的swap,从而替换数据。

在这里,PUT操作仅仅产生了一个新的CAS副本,并没有修改内存中的hash表,直到COMMIT时才完成修改。

(这里data的copy and swap其实很蠢,没必要copy整个表,其实只需要copy一个表项就行,不过我没怎么考虑性能,怎么实现方便怎么来就完事儿)

代码语言:javascript
复制
    @Override
    public Boolean COMMIT() {
        m = cas;
        return true;
    }

    @Override
    public Boolean PUT(String key, String val) {
        cas = m;
        cas.put(key, val);
        return true;
    }

Primary/Backup(多Client + 单Master +多Data Primary/Backup)

单纯的维护数据一致性是不够的,我们还需要确定谁是Primary。这里我采用了很naive的方法,列表最前的的就是Primary(不过要当心未同步的backup在index 0的特殊情况,后文处理),其他服务均是Backup。

PrimaryLoadBalance实现

读操作转发给Primary,写操作则调用所有服务,所有Backup的invoke均成功才能执行返回Primary并调用

代码语言:javascript
复制
         if (method.equals("READ")||method.equals("SYNC")) {
            return invokers.get(0);
        } else if (method.equals("PUT") || method.equals("DELETE") || method.equals("COMMIT")) {
            if (invokers.size() > 1) {
                for (int i = 1; i < invokers.size(); i++) {
                    invokers.get(i).invoke(invocation);
                }
            }
            return invokers.get(0);
        }

Primary/Backup

Primary节点负责读写,而Backup仅仅需要负责写。

Hot Fix(动态部署Backup)

即使按照最理想的情况也就是Replication State Machine复制状态机,我写的垃圾2PC实现真的保证了所有Data节点都按照相同的顺序执行了相同的操作(更何况这个2PC菜的起飞),那也仅仅保证了在相同的初态下节点之间能保持同步

如果我们需要动态增加某个group的backup来修复备份数目,那么他的初态是无数据,而其他的节点都已经存储了数据,此时2PC没有办法保证数据的一致性。我们需要让这个backup从primary获取数据进行同步。

同步数据,并且在Primary挂掉后顶替

简易实现,如果需要动态部署,那么配置文件中写入group.hotfix =1,通过hotfix变量判断Data节点是否需要同步。

代码语言:javascript
复制
@Value("${group.hotfix}")
    private String hotfix;

Data在执行写操作前,通过SYNC指令访问自己的Coordinator(Slave)获取Primary的全部数据。由于Slave已经针对写操作加锁,所以不用担心同时出现多个写操作同时进入Check并进入临界区,这里不需要对Data的写操作加锁。

问题在于,Dubbo里的服务发现只能通过接口、组、版本来完成,但是Master对Slave的要求使得Slave不能具备组。事实上,现在是Slave单方面知道Data的组,而反过来一无所知

那么,Data应该如何获取Primary呢?毕竟Slave才是2PC协议的管理者,才知道真正的Primary。这就要谈到之前的设计了,为什么不在Backup创建时就直接同步Primary,而是在写的时候再去同步?

Slave单方面知道Data,但调用Data的时候能够顺带传递参数,使得Data临时知道Slave的地址。在这里我用了RPC上下文,把TOW时获得的Slave地址最终传递给了Data。每次RPC调用都会使得参数失效,因此调用后都需要重置。

代码语言:javascript
复制
@DubboReference(version = "${demo.service.version}",loadbalance = "IPLoadBalance",check=false)
   private SlaveAPI slave;

void Check(){
        if(hotfix.equals("1")) {
            String address = RpcContext.getContext().getAttachment("URL");
            //RpcContext.getContext().setAttachment("URL",address);
            System.out.println("SYNC ");
            System.out.println(address);
            m = slave.SYNC();
            System.out.println(m.toString());
            System.out.println("SYNCED ");
            hotfix = "0";
        }
    }

与此同时,获得了地址的Backup需要找到Slave,这就需要我们继续重写负载均衡策略。这就是单纯从一组服务里筛选出地址相同的服务即可。

代码语言:javascript
复制
public class IPLoadBalance extends AbstractLoadBalance {
    protected <T> Invoker<T> findInvoker(List<Invoker<T>> invokers, String addr){
        for( Invoker<T> invoke: invokers)
            if(invoke.getUrl().getAddress().equals(addr))
                return invoke;
        return null;
    }
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String address = RpcContext.getContext().getAttachment("URL");
        System.out.println(address);
        return findInvoker(invokers,address);
    }
}

除了hotfix之外,Data节点的所有配置都相同,且hotfix并不影响Primary/Backup,因此动态部署的Backup也能自动成为Primary,和其他节点没有区别。当心,Backup没准直接跑列表最前面去了,因此我们需要记忆primary以防止节点增加时错把新增的backup当成了primary,不然就变成我同步我自己了。

代码语言:javascript
复制
if(primary.equals(""))
            primary = invokers.get(0).getUrl().getAddress();
        //handle加入新节点的情况,沿用之前的primary
        Invoker<T> pri= findInvoker(invokers,primary);
        if(pri == null) {
            //primary 挂了,决议新的
            pri = invokers.get(0);
            primary = invokers.get(0).getUrl().getAddress();
        }

        if (method.equals("READ")||method.equals("SYNC")) {
            return pri;
        } else if (method.equals("PUT") || method.equals("DELETE") || method.equals("COMMIT")) {
                for (int i = 0; i < invokers.size(); i++) {
                    if(!invokers.get(i).getUrl().getAddress().equals(primary))
                    invokers.get(i).invoke(invocation);
                }
            return pri;
        }
        return pri;

分布式系统架构

问题在于,我们可以很明显的发现,在节点与节点之间,有这么几个部分是仅有一条路径的,用图论来表示就是bridge。他们是:

  • Master : 存储历史一致性hash结果的路由表用于惰性数据迁移
  • Slave Coordinator: 存储Primary信息以防止动态部署的Backup自己同步自己

用FP的说法来说,副作用是万恶之源,同样是函数,副作用的函数调用一百次可能有一百个结果,而纯函数的调用只要输入确定,输出始终如一。

这个思想在分布式中同样适用,想要维持100个一致的数据节点难上加难,但提供100个纯函数无状态服务器集群则易如反掌。只要把数据分离出去,Replicas就是板上定钉的事。

分离数据(多Client + 多Master+ 单Router+多Data Primary/Backup)

现在,我们不在Master节点存储数据了。所有负载均衡中进行的读写操作都转为从Router中获取。

和往常一样,我们建立Router的RPC对象,根据下面的代码,需要实现三个原语:CONTAIN/GET/PUT。

代码语言:javascript
复制
if(method.equals("READ")){
            //有数据,沿用之前的服务器
            if(router.CONTAIN(key))
                return findInvoker(invokers, router.GET(key));
                //无数据,直接使用Hash
            else {
                router.PUT(key,invoke.getUrl().getAddress());
                return invoke;
            }
        }
        else if(method.equals("PUT")||method.equals("DELETE")){
            //有数据
            if(router.CONTAIN(key)) {
                //节点增加,更新至新的服务器,这个时候delete是个假的delete,但是新服务器没数据所以等效,但是返回false
                if (!router.GET(key).equals( expectAddr))
                    router.PUT(key,  expectAddr);
                return invoke;
            }
            //无数据,直接使用hash
            else {
                router.PUT(key, expectAddr);
                return invoke;
            }
        }

已经实现了Data,这个东西只不过是照搬一下,而且因为不需要数据切分,也不需要group。啊,易如反掌,这不就是Copy and Paste么。

然而,问题来了!!

我们的router是在LoadBalance里面实现的,而这个东西本身通过@Reference注入SlaveAPI的RPC对象,也就是说,要在依赖注入之后进行第二次依赖注入。

爷吐了。心情见application名。

尝试了整个下午一无所获,issue也没啥人理,那就算了,我用noob的API配置来做,反正也没有性能需求。以下方式手动获得RPC对象。

代码语言:javascript
复制
ApplicationConfig application = new ApplicationConfig();
        application.setName("NMSLWSND");
        RegistryConfig registry = new RegistryConfig();
        registry.setAddress("zookeeper://127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
        ReferenceConfig<RouterAPI> reference = new ReferenceConfig<RouterAPI>();
        reference.setApplication(application);
        reference.setRegistry(registry);
        reference.setInterface(RouterAPI.class);
        reference.setVersion("1.0.0");
        router = reference.get();

现在,路由表是从远端获取的了,然而这有什么好处呢?远端获取还容易失败。很简单,因为数据分离了,我们的Master节点已经是无状态的了,现在的它可以随意集群化,随意负载均衡,反正有读写锁的存在,不同Master之间也能保持并发的正确性。

Master的单点故障已经消除了,现在,单点故障转移到了Router这边。

路由表备份(多Client + 多Master+ 多Router Primary/Backup+多Data Primary/Backup)

按照之前的2PC协议,进行同样的实现,引入RouterCoordinator和RouterData,和之前的Slave按照同样的方法实现。

现在,数据都是多备份的了,节点崩溃也没关系,反正我们还能动态部署并数据同步,让备份维持在安全的数量。

然而,最后再来看一看,2PC引入的Coordinator本身会不会就是原罪呢?如果Coordinator崩溃,如果我们及时重新部署,的确可以恢复服务,但是在崩溃的期间,显然系统是不可用的。上文已经提到了,Coordinator其实保存了Primary节点的地址,虽然很小,但也是不可忽略的,有状态就意味着复杂的数据一致性。

对于维持这种小数据的一致性,其实Zookeeper本身就有了支持。

分离Coordinator元数据(多Client + 多Master+ 多Router Primary/Backup+多Data Primary/Backup + 多Coordinator)

还没有进行备份的数据就只剩下每个2PC中Primary的地址了,只要分离出它,放到zookeeper的节点进行CP的存储,那么Coordinator不就是无状态的么?

继续利用Dubbo提供的Curator客户端,顺便加上非常奇怪但它就是没有的SetContent接口,我们每次先从远端读取primary地址,进行本地计算判断primary是否失效,然后把新决议的primary写回zookeeper。

代码语言:javascript
复制
if (!zkClient.checkExists(nodeName)) {
            zkClient.createPersistent(nodeName);
        }
        primary = zkClient.getContent(nodeName);
        String method = invocation.getMethodName();
        if(primary.isEmpty()) {
            primary = invokers.get(0).getUrl().getAddress();
            zkClient.setContent(nodeName,primary);
        }
        Invoker<T> pri= findInvoker(invokers,primary);
        if(pri == null) {
            //primary 挂了,决议新的
            pri = invokers.get(0);
            primary = invokers.get(0).getUrl().getAddress();
            zkClient.setContent(nodeName,primary);
        }

现在,Coordinator已经无状态了,想要集群化+负载均衡也很容易,此时,如果我们需要修改路由表,负载会均摊到所有coordinator身上。

另一个问题出现了,正常跑的时候还好,一旦把原来的primary枪毙了,性能就下降很多。什么原因呢?

在primary消失的上面这段代码,其实是并发的,所有的请求都可以进入临界区。所有的指令都在执行着setContent这个指令,并且每个setContent都会被接受。我们实际上并不是更新一次primary,而是这段时间内所有请求的primary。对于coordinator集群来说,造成了庞大的并发写,本来就是CP协议的zookeeper自然不堪重负了。

在OS中也有类似的概念,对于同一缓存行的高度竞争会导致锁的性能大量下降,从而影响了scalability。

解决方法很简单,在setContent操作前后加锁。

最终架构

通过一堆花里胡哨的分离,最后从单Client+ 单Master+两Data的小系统变成了这张复杂的架构图,使用敏捷开发里强调的增量式开发。

全程实际上就是在不断地进行迭代,数据和服务进行分离,数据通过一致性协议完成备份,服务通过集群化完成备份,从而消除单点故障。

源代码如下,附带实验报告与文档。

演示视频如下,名字忽略就好反正没啥隐私,懒得打包主要太累,就直接IDEA运行了。


Issue 1: 解决单invoker时不触发负载均衡,导致路由表或Primary无法维护的问题

优先级:高,影响到了可缩性,限制数据集群数目下限至少是3(1个缓冲)

这个问题如果一直只有一个服务或者多个服务都不会出现,问题在于从一个服务增长到多个服务的情况,之前没有存储信息,后面突然要用,就会有问题。

Dubbo为了性能贴心地给我们做了集群节点数目为1的优化,然而我们并不想要他帮我们跳过,所以我们要重写框架。

服务发现源码分析

读Dubbo框架源码,Dubbo在单invoker情况下进行了优化。

代码语言:javascript
复制
public abstract class AbstractLoadBalance implements LoadBalance {
     public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
        }
    }

我们在自定义LoadBalance实现中进行重载,结果还是不行

代码语言:javascript
复制
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return this.doSelect(invokers, url, invocation);
        }
    }

这里就要讲Dubbo服务发现的原理了,Directory对应的是zookeeper中注册的服务列表,Router是按照条件筛选服务,LoadBalance是按照算法选择服务。

这里的Cluster就是把一组服务伪装成一个服务,也就是按照我们上面定义的Directory、Router、LoadBalance等规则选出Invoker,作为集群被调用的Invoker。

集群架构

我们来看一看Cluster是怎样通过这些规则进行计算的。

Directory:根据URL从Router Chain筛选Invoker

代码语言:javascript
复制
public abstract class AbstractDirectory<T> implements Directory<T> {
    protected RouterChain<T> routerChain;
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    private void refreshInvoker(List<URL> invokerUrls) {
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));
            this.routerChain.setInvokers(newInvokers);
            this.invokers = this.multiGroup ? this.toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;
        }
    }

RouterChain: 每个路由依次进行筛选

代码语言:javascript
复制
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = this.invokers;
        Router router;
        for(Iterator var4 = this.routers.iterator(); var4.hasNext(); finalInvokers = router.route(finalInvokers, url, invocation)) {
            router = (Router)var4.next();
        }
        return finalInvokers;
    }

Router:根据配置项进行筛选

代码语言:javascript
复制
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
                Iterator var5 = invokers.iterator();
                while(var5.hasNext()) {
                    Invoker<T> invoker = (Invoker)var5.next();
                    if (this.matchThen(invoker.getUrl(), url)) {
                        result.add(invoker);
                    }

Cluster : 根据Directory创建集群Invoker,通过directory/router获取invokers

代码语言:javascript
复制
public class FailoverCluster extends AbstractCluster { 
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }

ClusterInvoker:doInvoke->AbstractInvoker的select

代码语言:javascript
复制
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
   
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
      Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
    }
}

AbstractInvoker:select->doselect->loadBalance的select

代码语言:javascript
复制
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
                Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
                if (sticky) {
                    this.stickyInvoker = invoker;
                }
           }
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else if (invokers.size() == 1) {
            return (Invoker)invokers.get(0);
        } else {
            Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);

可以看出,关键在于AbstractInvoker的doSelect这一步,将单个服务的集群直接跳过了。我们需要重写Abstract Invoker,Dubbo是开源的,如果直接fork源码那这一步轻而易举,问题是maven项目里这些都是只读的。

WDNMD这个doSelect居然是个Pirvate!!

WDNMD这就是滥用继承么?写个Private强制继承实现?Protected会死么?

我们首先更换集群策略为FailureBack,只执行一次操作不重连,适合幂等操作,便于实现。原本的select方法无法重写,因此我们直接访问loadbalance,相当于单纯做转发,其他没法访问的代码就全部删除了。

代码语言:javascript
复制
 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        this.checkInvokers(invokers, invocation);
        Invoker invoker = loadbalance.select(invokers, this.getUrl(), invocation);

        try {
            return invoker.invoke(invocation);
        } catch (Throwable var6) {
            if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
                throw (RpcException)var6;
            } else {
                throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6);
            }
        }
    }

等等,为什么还不行!!!!!!!!!!!!!!!!!!!!!!!!!!

难道是因为Cluster的dojoin一开始就没有被调用么?你为什么这么熟练啊,为什么一个边界检测能检查这么多次啊?测试的工资别发那么高行不行?天天测边界测个锤子。

经过了化身恶魔之后,我继续看哪里还有边界检测,答案是在引用监听里面。一开始的时候URL只有一条就没有集群。

代码语言:javascript
复制
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
           @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
                  private T createProxy(Map<String, String> map) {
           if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            }  
           else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                }

我选择死亡。我不走这条路了。我宣布,需要维护数据的集群节点不得小于2.

没法改maven项目的源码好气啊,这就是框架的痛么。


Issue 2: 解决2PC协议是异步invoke,导致backup故障primary依旧能正常调用的问题

优先级: 中,反正都已经发现服务了,调用失败概率很小,corner case摸了摸了

思路: 同步化,捕获调用异常,由Slave Abort请求

Issue 3: 解决Zookeeper集群在setContent时并发getContent,以返回旧版本的问题

优先级: 中,不搭zk集群完事儿

思路: 加细粒度锁,限制getContent时机防止脏读

Issue 4: 解决读写分离/按地址访问不太符合Load Balance的Balance的问题

优先级: 低,语义问题摸了摸了

思路:算法大概前置到路由层,不过因为这里路由没工作,为了实现简单我就统一扔这里了。

思路: Cluster里有BroadCast类型,大概写操作放那里?不过这也不是广播,难顶。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求规约
  • Zookeepr集群搭建
  • 基本功能实现(单Client+单Master+单Data)
  • 负载均衡(单Client+单Master+多Data)
  • Transfer on Write!
  • 支持并发之分布式锁(多Client+单Master+多Data)
  • 数据备份(多Client + 单Master + 多Data Replicas)
  • Primary/Backup(多Client + 单Master +多Data Primary/Backup)
  • Hot Fix(动态部署Backup)
  • 分离数据(多Client + 多Master+ 单Router+多Data Primary/Backup)
  • 路由表备份(多Client + 多Master+ 多Router Primary/Backup+多Data Primary/Backup)
  • 分离Coordinator元数据(多Client + 多Master+ 多Router Primary/Backup+多Data Primary/Backup + 多Coordinator)
  • 最终架构
  • 服务发现源码分析
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档