分布式ID常见解决方案

一、分布式系统带来ID生成挑战

在分布式系统中,往往需要对大量的数据如订单、账户进行标识,以一个有意义的有序的序列号来作为全局唯一的ID。

而分布式系统中我们对ID生成器要求又有哪些呢?

  • 全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求。
  • 递增:比较低要求的条件为趋势递增,即保证下一个ID一定大于上一个ID,而比较苛刻的要求是连续递增,如1,2,3等等。
  • 高可用高性能:ID生成事关重大,一旦挂掉系统崩溃;高性能是指必须要在压测下表现良好,如果达不到要求则在高并发环境下依然会导致系统瘫痪。

二、业内方案简介

1. UUID方案

优点:

  1. 能够保证独立性,程序可以在不同的数据库间迁移,效果不受影响。
  2. 保证生成的ID不仅是表独立的,而且是库独立的,这点在你想切分数据库的时候尤为重要。

缺点:

  1. 性能问题:UUID太长,通常以36长度的字符串表示,对MySQL索引不利:如果作为数据库主键,在InnoDB引擎下,UUID的无序性可能会引起数据位置频繁变动,严重影响性能。
  2. UUID无业务含义:很多需要ID能标识业务含义的地方不使用。
  3. 不满足递增要求。

2. snowflake方案

snowflake是twitter开源的分布式ID生成系统。 Twitter每秒有数十万条消息的请求,每条消息都必须分配一条唯一的id,这些id还需要一些大致的顺序(方便客户端排序),并且在分布式系统中不同机器产生的id必须不同。

snowflake的结构如下(每部分用-分开):

0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 – 000000000000

第一位为未使用,接下来的41位为毫秒级时间(41位的长度可以使用69年),然后是5位datacenterId和5位workerId(10位的长度最多支持部署1024个节点) ,最后12位是毫秒内的计数(12位的计数顺序号支持每个节点每毫秒产生4096个ID序号)

一共加起来刚好64位,为一个Long型。

snowflake生成的ID整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和workerId作区分),并且效率较高。snowflake的缺点是:

  1. 强依赖时钟,如果主机时间回拨,则会造成重复ID
  2. ID虽然有序,但是不连续

snowflake现在有较好的改良方案,比如美团点评开源的分布式ID框架:leaf,通过使用ZooKeeper解决了时钟依赖问题。

3. 基于数据库方案

利用数据库生成ID是最常见的方案。能够确保ID全数据库唯一。其优缺点如下:

优点:

  1. 非常简单,利用现有数据库系统的功能实现,成本小,有DBA专业维护。
  2. ID单调自增。

缺点:

  1. 不同数据库语法和实现不同,数据库迁移的时候或多数据库版本支持的时候需要处理。
  2. 在单个数据库或读写分离或一主多从的情况下,只有一个主库可以生成。有单点故障的风险。
  3. 在性能达不到要求的情况下,比较难于扩展。
  4. 如果涉及多个系统需要合并或者数据迁移会比较麻烦。
  5. 分表分库的时候会有麻烦。

4.其他方案简介

通过Redis生成ID(主要通过redis的自增函数)、ZooKeeper生成ID、MongoDB的ObjectID等均可实现唯一性的要求。

三、我们在实际应用中使用的方案

1. 方案简介

实际业务中,除了分布式ID全局唯一之外,还有是否趋势/连续递增的要求。根据具体业务需求的不同,有两种可选方案。

一是只保证全局唯一,不保证连续递增。二是既保证全局唯一,又保证连续递增。

2. 基于ZooKeeper和本地缓存的方案

基于zookeeper分布式ID实现方案有很多种,本方案只使用ZooKeeper作为分段节点协调工具。每台服务器首先从zookeeper缓存一段,如1-1000的id。

此时zk上保存最大值1000,每次获取的时候都会进行判断,如果id小于本地最大值,即id<=1000,则更新本地的当前值,如果id大于本地当前值,比如说是1001,则会将从zk再获取下一个id数据段并在本地缓存。获取数据段的时候需要更新zk节点数据,更新的时候使用curator的分布式锁来实现。

由于id是从本机获取,因此本方案的优点是性能非常好。缺点是如果多主机负载均衡,则会出现不连续的id,当然将递增区段设置为1也能保证连续的id,但是效率会受到很大影响。

实现关键源码如下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 根据开源项目mycat实现基于zookeeper的递增序列号
 * <p>
 * 只要配置好ZK地址和表名的如下属性
 * MINID 某线程当前区间内最小值
 * MAXID 某线程当前区间内最大值
 * CURID 某线程当前区间内当前值
 *
 * @author wangwanbin
 * @version 1.0
 * @time 2017/9/1
 */
public class ZKCachedSequenceHandler extends SequenceHandler {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
    private static final String KEY_MIN_NAME = ".MINID";// 1
    private static final String KEY_MAX_NAME = ".MAXID";// 10000
    private static final String KEY_CUR_NAME = ".CURID";// 888
    private final static long PERIOD = 1000;//每次缓存的ID段数量
    private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();

    /**
     * 私有化构造方法,单例模式
     */
    private ZKCachedSequenceHandler() {
    }

    /**
     * 获取sequence工具对象的唯一方法
     *
     * @return
     */
    public static ZKCachedSequenceHandler getInstance() {
        return instance;
    }

    private Map<String, Map<String, String>> tableParaValMap = null;

    private CuratorFramework client;
    private InterProcessSemaphoreMutex interProcessSemaphore = null;

    public void loadZK() {
        try {
            this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
            this.client.start();
        } catch (Exception e) {
            LOGGER.error("Error caught while initializing ZK:" + e.getCause());
        }
    }

    public Map<String, String> getParaValMap(String prefixName) {
        if (tableParaValMap == null) {
            try {
                loadZK();
                fetchNextPeriod(prefixName);
            } catch (Exception e) {
                LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());
            }
        }
        Map<String, String> paraValMap = tableParaValMap.get(prefixName);
        return paraValMap;
    }

    public Boolean fetchNextPeriod(String prefixName) {
        try {
            Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);

            if (stat == null || (stat.getDataLength() == 0)) {
                try {
                    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                            .forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());
                } catch (Exception e) {
                    LOGGER.debug("Node exists! Maybe other instance is initializing!");
                }
            }
            if (interProcessSemaphore == null) {
                interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);
            }
            interProcessSemaphore.acquire();
            if (tableParaValMap == null) {
                tableParaValMap = new ConcurrentHashMap<>();
            }
            Map<String, String> paraValMap = tableParaValMap.get(prefixName);
            if (paraValMap == null) {
                paraValMap = new ConcurrentHashMap<>();
                tableParaValMap.put(prefixName, paraValMap);
            }
            long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));
            client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());
            if (now == 1) {
                paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");
                paraValMap.put(prefixName + KEY_MIN_NAME, "1");
                paraValMap.put(prefixName + KEY_CUR_NAME, "0");
            } else {
                paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");
                paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");
                paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");
            }
        } catch (Exception e) {
            LOGGER.error("Error caught while updating period from ZK:" + e.getCause());
        } finally {
            try {
                interProcessSemaphore.release();
            } catch (Exception e) {
                LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());
            }
        }
        return true;
    }

    public Boolean updateCURIDVal(String prefixName, Long val) {
        Map<String, String> paraValMap = tableParaValMap.get(prefixName);
        if (paraValMap == null) {
            throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");
        }
        paraValMap.put(prefixName + KEY_CUR_NAME, val + "");
        return true;
    }

    /**
     * 获取自增ID
     *
     * @param sequenceEnum
     * @return
     */
    @Override
    public synchronized long nextId(SequenceEnum sequenceEnum) {
        String prefixName = sequenceEnum.getCode();
        Map<String, String> paraMap = this.getParaValMap(prefixName);
        if (null == paraMap) {
            throw new RuntimeException("fetch Param Values error.");
        }
        Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
        Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
        if (nextId > maxId) {
            fetchNextPeriod(prefixName);
            return nextId(sequenceEnum);
        }
        updateCURIDVal(prefixName, nextId);
        return nextId.longValue();
    }

    public static void main(String[] args) throws UnsupportedEncodingException {
        long startTime = System.currentTimeMillis();   //获取开始时间
        final ZKCachedSequenceHandler sequenceHandler = getInstance();
        sequenceHandler.loadZK();
        new Thread() {
            public void run() {
                long startTime2 = System.currentTimeMillis();   //获取开始时间
                for (int i = 0; i < 5000; i++) {
                    System.out.println("线程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
                }
                long endTime2 = System.currentTimeMillis(); //获取结束时间
                System.out.println("程序运行时间1: " + (endTime2 - startTime2) + "ms");
            }
        }.start();
        for (int i = 0; i < 5000; i++) {
            System.out.println("线程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
        }
        long endTime = System.currentTimeMillis(); //获取结束时间
        System.out.println("程序运行时间2: " + (endTime - startTime) + "ms");
    }
}

可以看到,由于不需要进行过多的网络消耗,缓存式的zk协调方案性能相当了得,生成10000个id仅需553ms(两个线程耗时较长者) , 平均每个id消耗0.05ms。

3.利用zk的永久自增节点策略实现持续递增ID

使用zk的永久sequence策略创建节点,并获取返回值,然后删除前一个节点,这样既防止zk服务器存在过多的节点,又提高了效率;节点删除采用线程池来统一处理,提高响应速度。

优点:能创建连续递增的ID。

关键实现代码如下:

package com.zb.p2p.utils;

import com.zb.p2p.enums.SequenceEnum;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于zk的永久型自增节点PERSISTENT_SEQUENTIAL实现
 * 每次生成节点后会使用线程池执行删除节点任务
 * Created by wangwanbin on 2017/9/5.
 */
public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
    private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();
    private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
    private GenericObjectPool genericObjectPool;
    private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();
    private static String ZK_ADDRESS = ""; //192.168.0.65
    private static String PATH = "";//  /sequence/p2p
    private static String SEQ = "";//seq;

    /**
     * 私有化构造方法,单例模式
     */
    private ZKIncreaseSequenceHandler() {
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(4);
        genericObjectPool = new GenericObjectPool(this, config);
    }

    /**
     * 获取sequence工具对象的唯一方法
     *
     * @return
     */
    public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {
        ZK_ADDRESS = zkAddress;
        PATH = path;
        SEQ = seq;
        return instance;
    }

    @Override
    public long nextId(final SequenceEnum sequenceEnum) {
        String result = createNode(sequenceEnum.getCode());
        final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());
        final long id = Long.parseLong(idstr);
        preNodes.add(id);
        //删除上一个节点
        fixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                Iterator<Long> iterator = preNodes.iterator();
                if (iterator.hasNext()) {
                    long preNode = iterator.next();
                    if (preNode < id) {
                        final String format = "%0" + idstr.length() + "d";
                        String preIdstr = String.format(format, preNode);
                        final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;
                        CuratorFramework client = null;
                        try {
                            client = (CuratorFramework) genericObjectPool.borrowObject();
                            client.delete().forPath(prePath);
                            preNodes.remove(preNode);
                        } catch (Exception e) {
                            LOGGER.error("delete preNode error", e);
                        } finally {
                            if (client != null)
                                genericObjectPool.returnObject(client);
                        }
                    }
                }
            }
        });
        return id;
    }

    private String createNode(String prefixName) {
        CuratorFramework client = null;
        try {
            client = (CuratorFramework) genericObjectPool.borrowObject();
            String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                    .forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());
            return result;
        } catch (Exception e) {
            throw new RuntimeException("create zookeeper node error", e);
        } finally {
            if (client != null)
                genericObjectPool.returnObject(client);
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        long startTime = System.currentTimeMillis();   //获取开始时间
        final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq");
        int count = 10;
        final CountDownLatch cd = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    System.out.printf("线程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));
                    cd.countDown();
                }
            });
        }
        try {
            cd.await();
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted thread",e);
            Thread.currentThread().interrupt();
        }
        long endTime = System.currentTimeMillis(); //获取结束时间
        System.out.println("程序运行时间: " + (endTime - startTime) + "ms");

    }

    @Override
    public PooledObject<CuratorFramework> makeObject() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
        client.start();
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {

    }

    @Override
    public boolean validateObject(PooledObject<CuratorFramework> p) {
        return false;
    }

    @Override
    public void activateObject(PooledObject<CuratorFramework> p) throws Exception {

    }

    @Override
    public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {

    }
}

测试结果如下,生成10000个id消耗=9443ms(两个线程耗时较长者), 平均每个id消耗0.9ms。

这还只是单zk连接的情况下,如果使用连接池来维护多个zk的连接,效率将成倍的提升。

四、结语

分布式ID生成器的实现有很多种。目前各方案也都各有特点。我们可以根据业务的具体要求,选择实现合适的方案。

原文地址:https://blog.csdn.net/m0_37041378/article/details/78125747
Java架构沉思录做了部分语义及样式上的修改。

原文发布于微信公众号 - Java架构沉思录(code-thinker)

原文发表时间:2018-05-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Mybatis框架复习大纲【面试+提高】

Mybatis框架复习大纲【面试+提高】 1.MyBatis面试题汇总 1.1 JDBC编程有哪些不足之处,MyBatis是如何解决这些问题的? ① 数据库链...

5357
来自专栏情情说

深入浅出MyBatis:「映射器」全了解

上一篇总结了MyBatis的配置,详细说明了各个配置项,其中提到了映射器,它是MyBatis最强大的工具,也是使用最多的工具。

3696
来自专栏安恒网络空间安全讲武堂

从零基础到成功解题之0ctf-ezdoor

2174
来自专栏你不就像风一样

史上超全面的Elasticsearch使用指南

elasticsearch简写es,es是一个高扩展、开源的全文检索和分析引擎,它可以准实时地快速存储、搜索、分析海量的数据。

3K2
来自专栏orientlu

FreeRTOS 任务调度 任务创建

FreeRTOS 的任务调度在 Source/include/task.c 中实现,包含了任务的创建、切换、挂起、延时和删除等所有功能。涉及到的链表组织见文章 ...

2904
来自专栏数据库

高级盲注—floor,rand,group by报错注入

大家好,我是你们的老朋友Alex。最近一直在学习SQL注入,发现了很多很多有趣的东西。我就分享我的一篇有关floor,rand,group by报错注入的笔记吧...

2729
来自专栏MasiMaro 的技术博文

驱动程序的同步处理

驱动程序运行在系统的内核地址空间,而所有进程共享这2GB的虚拟地址空间,所以绝大多数驱动程序是运行在多线程环境中,有的时候需要对程序进行同步处理,使某些操作是严...

1461
来自专栏JAVA高级架构

Java阿里面试题

3451
来自专栏Hongten

python开发_shelve_完整版_博主推荐

=====================================================

832
来自专栏蓝天

strerror线程安全分析

答案是NO,但它有个线程安全的版本:strerror_r。借助Linux的man,即可看到详情:

1403

扫码关注云+社区

领取腾讯云代金券