专栏首页扎心了老铁springboot使用zookeeper(curator)实现注册发现与负载均衡

springboot使用zookeeper(curator)实现注册发现与负载均衡

最简单的实现服务高可用的方法就是集群化,也就是分布式部署,但是分布式部署会带来一些问题。比如:

1、各个实例之间的协同(锁)

2、负载均衡

3、热删除

这里通过一个简单的实例来说明如何解决注册发现和负载均衡。

1、先解决依赖,这里只给出zk相关的依赖,pom.xml如下

  <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.9.1</version>
  </dependency>

2、ZkClient

这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api,举例来说

zk的create(path, mode, acl, data)方法 == curator create().withMode(mode).forPath(path)调用链

package com.dqa.prometheus.client.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    private String zookeeperServer;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private int baseSleepTimeMs;
    private int maxRetries;

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }
    public String getZookeeperServer() {
        return zookeeperServer;
    }
    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }
    public int getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }
    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }
    public int getConnectionTimeoutMs() {
        return connectionTimeoutMs;
    }
    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;
    }
    public int getBaseSleepTimeMs() {
        return baseSleepTimeMs;
    }
    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }
    public int getMaxRetries() {
        return maxRetries;
    }

    public void init() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
        client.start();
    }

    public void stop() {
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }

    public void register() {
        try {
            String rootPath = "/" + "services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "prometheus" + "-" +  hostAddress + "-";
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

    public List<String> getChildren(String path) {
        List<String> childrenList = new ArrayList<>();
        try {
            childrenList = client.getChildren().forPath(path);
        } catch (Exception e) {
            logger.error("获取子节点出错", e);
        }
        return childrenList;
    }

    public int getChildrenCount(String path) {
        return getChildren(path).size();
    }

    public List<String> getInstances() {
        return getChildren("/services");
    }

    public int getInstancesCount() {
        return getInstances().size();
    }
}

2、configuration如下

package com.dqa.prometheus.configuration;


import com.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfiguration {
    @Value("${zookeeper.server}")
    private String zookeeperServer;
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClient zkClient() {
        ZkClient zkClient = new ZkClient();
        zkClient.setZookeeperServer(zookeeperServer);
        zkClient.setSessionTimeoutMs(sessionTimeoutMs);
        zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
        zkClient.setMaxRetries(maxRetries);
        zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
        return zkClient;
    }

}

配置文件如下

#============== zookeeper ===================
zookeeper.server=10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
zookeeper.sessionTimeoutMs=6000
zookeeper.connectionTimeoutMs=6000
zookeeper.maxRetries=3
zookeeper.baseSleepTimeMs=1000

3、注册发现

是通过上面封装的ZkClient中的register方法实现的,调用如下。

package com.dqa.prometheus;

import com.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.orm.jpa.EntityScan;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAsync
@EnableScheduling
@EntityScan(basePackages="com.xiaoju.dqa.prometheus.model")
public class Application {
    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(Application.class, args);
        ZkClient zkClient = context.getBean(ZkClient.class);
        zkClient.register();
    }
}

注册代码说明:

 public void register() {
        try {
            String rootPath = "/" + "services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "prometheus" + "-" +  hostAddress + "-";
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

1、zk中的注册路径

/services/prometheus-10.93.21.21-00000000001

2、CreateMode有四种,选择EPHEMERAL_SEQUENTIAL的原因是,服务关闭的时候session超时,zk节点会自动删除,同时自增id可以实现锁和负载均衡,下面再说

1、PERSISTENT

持久化目录节点,存储的数据不会丢失。

2、PERSISTENT_SEQUENTIAL

顺序自动编号的持久化目录节点,存储的数据不会丢失,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

3、EPHEMERAL

临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除。 

4、EPHEMERAL_SEQUENTIAL

临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

4、负载均衡

     /*
        *   我是第几个实例, 做负载均衡
        * */
        List<String> instanceList = zkClient.getInstances();
        Collections.sort(instanceList);
        String hostAddress = NetFunction.getAddressHost();
        int instanceNo = 0;
        if (hostAddress !=  null) {
            for (int i=0; i<instanceList.size(); i++) {
                if (instanceList.get(i).split("-")[1].equals(hostAddress)) {
                    instanceNo = i;
                }
            }
        } else {
            logger.info("获取本地IP失败");
        }
        logger.info("[分发] 实例总数={}, 我是第{}个实例", instanceCount, instanceNo);
        List<CheckTask> waitingTasks = checkTaskDao.getTasks(taskType, TaskStatus.WAITING.getValue());
        Iterator<CheckTask> waitingIterator = waitingTasks.iterator();
        while (waitingIterator.hasNext()) {
            if (waitingIterator.next().getTaskId().hashCode() % instanceCount != instanceNo) {
                waitingIterator.remove();
            }
        }

说明:

1、例如有3个实例(zkClient.getInstances()),那么通过IP我们把3个实例按照自增id排序分别标号为0,1,2

2、对第一个实例也就是instanceNo=0,只执行taskId.hashCode() % 3 == 0的任务,其他两个实例类似

3、当有一个实例挂掉,2个实例,instanceNo=0只执行taskId.hashCode() % 2 == 0的任务,实现热删除

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • springboot mybatis优雅的添加多数据源

    springboot的原则是简化配置,本文试图不通过xml配置,使用configuration配置数据源,并进行简单的数据访问。 并且配置了多数据源,在开发过程...

    用户1225216
  • springboot与thrift集成实现服务端和客户端

    我们这里用一个简单的小功能来演示一下如何使用springboot集成thrift 这个功能是,判断hdfs路径存在。 1、先解决依赖 <dependencie...

    用户1225216
  • java优雅的使用elasticsearch api

    本文给出一种优雅的拼装elasticsearch查询的方式,可能会使得使用elasticsearch的方式变得优雅起来,使得代码结构很清晰易读。 建立elast...

    用户1225216
  • Spring Cloud 入门教程3、服务消费者(Feign)

    Feign是基于Ribbon封装的HTTP Client工具包,Feign的目标是简化HTTP Client。Feign也确实做到了这一点,使用Feign发起H...

    ken.io
  • Spring Cloud 入门教程3、服务消费者(Feign)

    Feign是基于Ribbon封装的HTTP Client工具包,Feign的目标是简化HTTP Client。Feign也确实做到了这一点,使用Feign发起H...

    wuweixiang
  • spring-boot 速成(9) druid+mybatis 多数据源及读写分离的处理

    按上节继续学习,稍微复杂的业务系统,一般会将数据库按业务拆开,比如产品系统的数据库放在product db中,订单系统的数据库放在order db中...,然后...

    菩提树下的杨过
  • hadoop io 源码阅读

    为什么不直接试用int,string而要采用IntWritable和Text呢?这就涉及到了序列化。 序列化

    用户1621453
  • [SpingBoot guides系列翻译]文件上传

    mkdir -p src/main/java/hello,其实也就是在IntelliJ里面新建一个空的Java项目,然后添加一个main.java.hellop...

    _淡定_
  • 基于Spring Data JPA框架的文章归档实现

    最近在写自己的个人博客系统,框架采用SpringMVC、Spring4.0、Spring Data/JPA组合,本博客就文档归档功能在Spring Data J...

    用户1208223
  • MyBatis实现分页查询

    分页查询在我们生活中非常常见,当我们做一个内容很多,数据量很大的项目的时候,在一个页面显示不过来,我们便会采取分页的思想,在每个页面上显示固定数量的内容,分多个...

    枉凝眉

扫码关注云+社区

领取腾讯云代金券