前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >50亿加密手机号md5快速存储及检索,rocksDB、redis等探索

50亿加密手机号md5快速存储及检索,rocksDB、redis等探索

作者头像
天涯泪小武
发布2019-07-03 11:38:54
3.3K0
发布2019-07-03 11:38:54
举报
文章被收录于专栏:SpringCloud专栏SpringCloud专栏

首先需求比较简单,将所有的号码段(如130、131、132)的全部手机号的md5和其对应的手机号存起来,将来传入一批手机号的md5,能迅速给出对应的明文手机号。这样的存储业务在dsp系统中会有类似的场景,可能放的是imei号,cookie等。

一个手机号是11位,一个号段如130的全部手机号有1亿个,即1300000000-13099999999。将他们全部存起来,key为md5(手机号),value为手机号。将来,查询就是拿几十万个手机号的md5,迅速给出对应的明文手机号。

听起来很简单是吗,简单的key-value存储而已。可选的存储组件也有很多,譬如redis、levelDB、rocksDB,还有一些第三方组件,如360的pika等,都是一些专门存储key-value型的。主要的数据结构也无非是2种,LSM树和B+树。LSM写入快,B+树是innerdb的默认索引,读取快,写入较慢。

我们的场景其实对读取要求性能较高,写入的话倒是次要,因为一次写入,永久不更新了。而且面临的是随机读取,并不是顺序、倒序或范围查询,大部分的数据库面临随机读时,都很慢。通过他们的数据结构就能知道。

我们可以来简单分析一下各个数据库。rocksDB可以看做levelDB的升级版,levelDB就排除在外。

pika是对redis的一些封装,在某些特定场景下优于redis,某些场景下不如redis,具体可以去看一下它的官方文档。pika存在的目的是想优化内存,将部分冷数据放入rocksdb的硬盘存储。争取在速度和内存占用上达到个平衡。

redis本身速度极快,是基于内存的单线程key-value存储器。在使用pipeline和硬件较好的情况下,理论峰值可达到10万/s的读取,写入速度也可达到4-5万/s。读写都非常迅速,在合理的空间占用下,可以达到最优的性能表现。

但redis很明显的问题就是占内存巨大,这50亿数据进去,以最坏的情况来讲,不做任何处理,全部以key-value来存储原文,几百个G内存就没了。

此时,我们先来看看rocksDB的表现。

rocksDB具体的工作原理就不详解,网上很多资料。怎么先写入内存,怎么批量刷磁盘,怎么索引等等都看一下。

由于担心几十亿数据都存入一个rocksDB库,可能导致大量的hash冲突,和检索时的性能极慢,我决定建起由10个rocksDB组成的集群,每个里面放5亿数据。

直接上代码吧,新建一个Springboot项目。pom.xml里引入rocksDB依赖

代码语言:javascript
复制
 <!--rocksdb start-->
        <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>5.18.3</version>
        </dependency>
        <!--rocksdb end-->

配置一下rocksDB:

代码语言:javascript
复制
@Configuration
public class DbInitConfig {
    @Value("${rock.readOnly}")
    private Boolean readOnly;

    public static int TOTAL_ROCKS = 10;

    private Options options() {
        //RocksDB.loadLibrary();
        Options options = new Options().setCreateIfMissing(true);
        options.setMaxBackgroundCompactions(16);
        options.setNewTableReaderForCompactionInputs(true);
        //为压缩的输入,打开RocksDB层的预读取
        options.setCompactionReadaheadSize(128 * SizeUnit.KB);
        options.setNewTableReaderForCompactionInputs(true);

        Filter bloomFilter = new BloomFilter(10);
        final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(64 * SizeUnit.KB)
                .setFilter(bloomFilter)
                .setCacheNumShardBits(6)
                .setBlockSizeDeviation(5)
                .setBlockRestartInterval(10)
                .setCacheIndexAndFilterBlocks(true)
                .setHashIndexAllowCollision(false)
                .setBlockCacheCompressedSize(64 * SizeUnit.KB)
                .setBlockCacheCompressedNumShardBits(10);


        options.setTableFormatConfig(tableConfig);

        return options;
    }

    @Primary
    @Bean(name = "RockBean0")
    public RocksDB rocksDB() {
        try {
            if (readOnly) {
                return RocksDB.openReadOnly(options(), "./wu0");
            } else {
                return RocksDB.open(options(), "./wu0");
            }
        } catch (RocksDBException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Bean(name = "RockBean1")
    public RocksDB rocksDB1() {
        try {
            if (readOnly) {
                return RocksDB.openReadOnly(options(), "./wu1");
            } else {
                return RocksDB.open(options(), "./wu1");
            }
        } catch (RocksDBException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Bean(name = "RockBean2")
    public RocksDB rocksDB2() {
        try {
            if (readOnly) {
                return RocksDB.openReadOnly(options(), "./wu2");
            } else {
                return RocksDB.open(options(), "./wu2");
            }
        } catch (RocksDBException e) {
            e.printStackTrace();
            return null;
        }
    }
……………………省略其他

新建一个接口类,用来定义rocksDB的基本操作:

代码语言:javascript
复制
package com.example.demo.db;

import java.util.List;
import java.util.Map;

/**
 * key-value型DB数据库操作接口
 * @author wuweifeng wrote on 2018/3/26.
 */
public interface DbStore {
    /**
     * 数据库key value
     *
     * @param key
     *         key
     * @param value
     *         value
     */
    void put(String key, String value);

    /**
     * get By Key
     *
     * @param key
     *         key
     * @return value
     */
    String get(String key);

    Map<String, String> multiGet(List<String> key);
    
    /**
     * remove by key
     *
     * @param key
     *         key
     */
    void remove(String key);
}

实现这个接口

代码语言:javascript
复制
@Component
public class RocksDbStoreImpl implements DbStore {

    @Resource
    private List<RocksDB> rocksDBS;

    private RocksDB getRocksDB(String key) {
        int code = key.hashCode() % TOTAL_ROCKS;

        return getRocksDB(code);
    }

    private RocksDB getRocksDB(int code) {
        if (code < 0) {
            code = -code;
        }
        return rocksDBS.get(code);
    }

    @Override
    public void put(String key, String value) {
        try {
            RocksDB rocksDB = getRocksDB(key);
            rocksDB.put(key.getBytes(Const.CHARSET), value.getBytes(Const.CHARSET));
        } catch (RocksDBException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String get(String key) {
        try {
            byte[] bytes = getRocksDB(key).get(key.getBytes(Const.CHARSET));
            if (bytes != null) {
                return new String(bytes, Const.CHARSET);
            }
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Map<String, String> multiGet(List<String> keys) {
        try {
            MultiValueMap<Integer, String> multiValueMap = new LinkedMultiValueMap<>();

            for (String single : keys) {
                int code = single.hashCode() % 10;
                multiValueMap.add(code, single);
            }

            Map<String, String> results = new HashMap<>(keys.size());

            for (int key : multiValueMap.keySet()) {
                List<String> oneList = ((LinkedMultiValueMap<Integer, String>) multiValueMap).get(key);
                List<byte[]> oneKeyList = new ArrayList<>();
                for (String s : oneList) {
                    oneKeyList.add(s.getBytes(Const.CHARSET));
                }

                Map<byte[], byte[]> valueMap = getRocksDB(key).multiGet(oneKeyList);

                Map<String, String> oneResult = new HashMap<>(oneList.size());
                for (Map.Entry<byte[], byte[]> entry : valueMap.entrySet()) {
                    oneResult.put(new String(entry.getKey()), new String(entry.getValue()));
                }

                results.putAll(oneResult);
            }

            return results;
        } catch (RocksDBException | UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Map<String, String> multiGetFromOne(List<String> keys) {
        try {
            int code = keys.get(0).hashCode() % 10;

            Map<String, String> results = new HashMap<>(keys.size());

            List<byte[]> oneKeyList = new ArrayList<>();
            for (String s : keys) {
                oneKeyList.add(s.getBytes(Const.CHARSET));
            }

            Map<byte[], byte[]> valueMap = getRocksDB(code).multiGet(oneKeyList);

            for (Map.Entry<byte[], byte[]> entry : valueMap.entrySet()) {
                results.put(new String(entry.getKey()), new String(entry.getValue()));
            }

            return results;
        } catch (RocksDBException | UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void remove(String key) {
        try {
            getRocksDB(key).delete(getRocksDB(key).get(key.getBytes(Const.CHARSET)));
        } catch (RocksDBException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

}

其中里面的CHARSET就是“utf-8”;

这个实现类里面主要完成的就是根据key,进行分片,来决定该key的读写应该在哪个rocksDB库上。

其中稍微有点麻烦的是multiGet,由于multiGet是批量从一个rocksDB上读取对应的key集合。所以需要先判断这一批key分别要从哪个DB去读取,之后分别读取后再组合起来返回即可。

下面看多线程写入rocksDB的代码

代码语言:javascript
复制
package com.example.demo.controller;

import com.example.demo.ImportToRocks;
import com.example.demo.db.DbStore;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author wuweifeng wrote on 2019/6/17.
 */
@Service
public class InsertService {
    @Resource
    private DbStore dbStore;

    private ExecutorService executorService = Executors.newFixedThreadPool(64);

    @SuppressWarnings("AlibabaThreadPoolCreation")
    public void insert(long number) {
        System.out.println("--------------------------------------当前号段" + number);

        //每个号段一亿个数
        int totalCount = 100000000;

        //int totalCount = 1000000;
        //每次每个线程写入1万个
        int batchSize = 10000;

        number = number * totalCount;

        //这一亿个数,分多少次写入
        int loopCount = totalCount / batchSize;

        CountDownLatch countDownLatch = new CountDownLatch(loopCount);
        for (int i = 0; i < loopCount; i++) {
            List<String> tempList = new ArrayList<>();
            for (long j = number + batchSize * i; j < number + (batchSize * (i + 1)); j++) {
                tempList.add(j + "");
            }

            executorService.execute(new ImportToRocks(dbStore, tempList, countDownLatch));
        }

        try {
            countDownLatch.await();
            System.out.println("-----------------------------插入完毕-------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void insertAll() {
        //134L,
        long[] array = {135L, 136L, 137L, 138L, 139L,147L, 148L, 150L, 151L, 152L, 157L, 158L, 159L, 165L, 172L, 178L, 182L, 183L,
                184L, 187L, 188L, 198L,
                130L, 131L, 132L, 145L, 146L, 155L, 156L, 166L, 171L, 175L, 176L, 185L, 186L, 133L, 149L, 153L, 173L, 174L, 177L, 180L,
                181L, 189L, 199L, 170L};
        CountDownLatch countDownLatch = new CountDownLatch(array.length);
        for (long num : array) {
            insert(num);
            countDownLatch.countDown();
        }
        try {
            countDownLatch.await();
            System.out.println("-----------------------------插入完毕-------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

这里定义个线程池,每个线程每次写入1万个。里面有两个方法,一个是插入一个号段的,另一个是插入全部号段的。根据情况调用即可。

代码语言:javascript
复制
package com.example.demo;

import com.example.demo.db.DbStore;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author wuwf
 */
public class ImportToRocks implements Runnable {
    private DbStore dbStore;
    private List<String> list;
    private  CountDownLatch countDownLatch;

    public ImportToRocks(DbStore dbStore, List<String> list, CountDownLatch countDownLatch) {
        this.dbStore = dbStore;
        this.list = list;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        Long time = System.currentTimeMillis();
        System.out.println("进入线程");
        for (String s : list) {
            dbStore.put(CommonUtil.md5(s), s);
        }
        System.out.println(Thread.currentThread().getName() + "插入耗时:" + (System.currentTimeMillis() - time));
        countDownLatch.countDown();
    }
}

这个线程的操作很简单,就是写入md5->num的键值对。

代码语言:javascript
复制
package com.example.demo.controller;

import com.example.demo.db.DbStore;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.*;

/**
 * @author wuweifeng wrote on 2019/6/17.
 */
@RestController
@RequestMapping
public class IndexController {
    @Resource
    private DbStore dbStore;
    @Resource
    private InsertService insertService;

    @RequestMapping("fetch")
    public Map<String, String> query(String content) {
        String[] array = content.split(",");
        Map<String, String> map = new HashMap<>();
        for (String s : array) {
            map.put(s, dbStore.get(s));
        }
        return map;
    }

    @RequestMapping("batch")
    public Map<String, String> batch(String content) {
        String[] array = content.split(",");
        List<String> list = Arrays.asList(array);

        return dbStore.multiGet(list);
    }

    @RequestMapping("insert")
    public String insert(long number) {
        insertService.insert(number);

        return "1";
    }

    @RequestMapping("insertAll")
    public String insertAll() {
        insertService.insertAll();
        return "1";
    }
}

定义个简单的Controller可以用来做控制插入和批量读取。

之后我在本机(4核8G MacBook pro+固态硬盘)和服务器(32核250G内存+普通机械硬盘)分别进行了简单的测试。写入速度还是不错。

本机的话,我修改为8个线程,可以看到,0.5秒左右单线程插入1万个。共计8个线程。基本上相当于每秒插入16万个。

在32核服务器上,我开启了64个线程,由于是机械硬盘,单次写入速度下降明显。

大概5-7秒左右,单个线程完成了1万个值的插入。共64个线程同时执行。也就7秒插入64万个。每秒不到10万。cpu占用率在600-900%左右,比较稳定,内存占用也很稳定。

普通机械硬盘空间占用就不考虑了,比内存肯定是省钱很多。但读取性能较差,我们来看一下从服务器读取的速度:

由于服务端采用的是单线程读取的方式,根据key的code取模后从对应的rocksDB获取,之后再拼装。

我测试了一下,读取1万个,2万个的耗时

在首次读取1万次个时,耗时约17秒。再次读取同样的一批时耗时2.5秒。2万个第一次是17.2秒,再次读取耗时5秒。其实很明显,这样的速度是比较弱的。原因也比较明确,我查看过它的存储,几个亿共分了4层,那么查询一个key,就意味着4次磁盘IO,单次IO在5-10ms。多个线程同时从不同的rocksDB库里获取,一秒也就几百个就极限了。倘若换成固态硬盘,那么性能会上升一个数量级。至于读取过一次后,就入内存的缓存,所以第二次就快一些。

通过实验,发现这样的随机读取速度,并不能满足日常的使用。

之后,我采用了全部redis的方式。

这里肯定有人会问了,几百G的空间占用,你怎么解决。

ok,其实用redis,主要解决的问题,就是内存压缩,将本来要250-300G存下的东西,压缩几倍,最终在80G存完,并不影响性能。

这应该是做DSP的同志们最常使用并且研究的问题了,dsp的时效性很重要,数据库又巨大,那么redis肯定是首选,内存压缩也是首要问题。

这种就需要那些只会往redis里放key-value的同学们去深入研究一下redis了。我博客里也转载了几篇redis大数据量存储的博客可以看看。

主要就是应用redis在存hash时,配置的key数量小于256时,会默认使用数组而不是hash结构来存储,从而节省大量空间。当然这个值可以配置,在1024以下,都能得到极佳的性能。这样,每个bucket只要存的少于1024个key,就能在不影响性能情况下,得到大幅的内存压缩。同时,对md5这种长字符串,进行取hash模,将key全部转为16位数字,又能得到仅1/4的key内存占用。

代码参考:

代码语言:javascript
复制
@Service
public class InsertService {
    //@Resource
    //private DbStore dbStore;
    @Value("${thread.count}")
    private Integer threadCount;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    private ExecutorService executorService;


    @PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(8);
    }

    @SuppressWarnings("AlibabaThreadPoolCreation")
    public void insert(long number) {
        System.out.println("--------------------------------------当前号段" + number);

        //每次每个线程写入10万个
        int batchSize = 100000;

        //这一亿个数,分多少次写入
        int loopCount = 100000000 / batchSize;

        CountDownLatch countDownLatch = new CountDownLatch(loopCount);
        for (int i = 0; i < loopCount; i++) {
            List<String> tempList = new ArrayList<>();
            for (long j = number + batchSize * i; j < number + (batchSize * (i + 1)); j++) {
                tempList.add(j + "");
            }

            executorService.execute(new ImportToRedis(stringRedisTemplate, tempList, countDownLatch));
        }

        try {
            countDownLatch.await();
            System.out.println("-----------------------------插入完毕-------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void insertAll() {
        //18000000000L,18100000000L, 18200000000L,18300000000L,18400000000L,18500000000L,
        //13000000000L, 13100000000L, 13200000000L, 13300000000L, 13400000000L, 13500000000L,
        //                13600000000L, 13700000000L, 13800000000L, 13900000000L, 14500000000L, 14600000000L,
        //                14700000000L, 14800000000L, 14900000000L, 15000000000L, 15100000000L, 15200000000L,
        //                15300000000L, 
        long[] array = {
                 15500000000L, 15600000000L, 15700000000L, 15800000000L, 15900000000L,
                16500000000L, 16600000000L, 17000000000L, 17100000000L, 17200000000L, 17300000000L,
                17400000000L, 17500000000L, 17600000000L, 17700000000L, 17800000000L, 18600000000L,
                18000000000L, 18100000000L, 18200000000L, 18300000000L, 18400000000L, 18500000000L,
                18700000000L,  18800000000L, 18900000000L, 19800000000L, 19900000000L};
        CountDownLatch countDownLatch = new CountDownLatch(array.length);
        for (long num : array) {
            insert(num);
            countDownLatch.countDown();
        }
        try {
            countDownLatch.await();
            System.out.println("-----------------------------插入完毕-------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
代码语言:javascript
复制
public class ImportToRedis implements Runnable {
    private StringRedisTemplate stringRedisTemplate;
    private List<String> list;
    private CountDownLatch countDownLatch;

    public ImportToRedis(StringRedisTemplate redisTemplate, List<String> list, CountDownLatch countDownLatch) {
        this.stringRedisTemplate = redisTemplate;
        this.countDownLatch = countDownLatch;
        this.list = list;
    }

    @Override
    public void run() {
        Long time = System.currentTimeMillis();
        System.out.println("进入线程");


        stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                //for (String key : map.keySet()) {
                //    stringRedisTemplate.opsForHash().putAll(key, map.get(key));
                //}
                for (String phone : list) {
                    String md5 = CommonUtil.md5(phone);
                    String hashKey = KeyTool.hashKey(md5);
                    String realKey = KeyTool.newKey(md5);
                    stringRedisTemplate.opsForHash().put(hashKey, realKey, phone);
                }
                return null;
            }
        });
        System.out.println(Thread.currentThread().getName() + "插入耗时:" + (System.currentTimeMillis() - time));
        countDownLatch.countDown();
    }
代码语言:javascript
复制
public static int KEY_COUNT = 2<<24;

    /**
     * 计算redis的hash key
     */
    public static String hashKey(String key) {
        CRC32 crc32 = new CRC32();
        crc32.update(key.getBytes());
        return crc32.getValue() % KEY_COUNT + "";
    }

    public static String newKey(String key) {
        return BKDRHash(key) + "";
    }

    /**
     * BKDR算法
     */
    public static int BKDRHash(String str) {
        // 31 131 1313 13131 131313 etc.. 找个质数
        int seed = 131;
        int hash = 0;
        for (int i = 0; i < str.length(); i++) {
            hash = (hash * seed) + str.charAt(i);
        }
        return (hash & 0x7FFFFFFF);
    }

读取:

代码语言:javascript
复制
@Service
public class FetchService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public Map<String, String> batch(List<String> list) {
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(list.size());

        for (String phone : list) {
            String md5 = CommonUtil.md5(phone);
            //hash的key
            String hashKey = KeyTool.hashKey(md5);
            //在hash里面的key
            String realKey = KeyTool.newKey(md5);


        }
        return null;
    }

    public Map<String, String> fetch(List<String> keys) {
        Map<String, String> totalMap = new HashMap<>(keys.size());
        int batchSize = 50000;

        List<Object> total = new ArrayList<>(keys.size());
        for (int i = 0; i < keys.size() / batchSize + 1; i++) {
            List<String> list;
            if (batchSize * (i + 1) > keys.size()) {
                list = keys.subList(batchSize * i, keys.size());
            } else {
                list = keys.subList(batchSize * i, batchSize * (i + 1));
            }

            List<Object> phones = fromRedis(list);
            total.addAll(phones);
        }

        for (int i = 0; i < total.size(); i++) {
            if (total.get(i) != null) {
                totalMap.put(keys.get(i), total.get(i).toString());
            }
        }
        return totalMap;
    }

    private List<Object> fromRedis(List<String> list) {
        return stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                for (String md5 : list) {
                    //hash的key
                    String hashKey = KeyTool.hashKey(md5);
                    //在hash里面的key
                    String realKey = KeyTool.newKey(md5);

                    stringRedisTemplate.opsForHash().get(hashKey, realKey);

                }
                return null;
            }
        });
    }
}

以上就是通过pipelined来批量读写redis。读写前做好key的映射关系。

实测写入在10万/s,随机读取单线程在5K/s。性能满足日常使用。总内存占用,由不压缩时8G/亿,到1.7G/亿。共80G存完这47亿数据。空间占用和性能之间的平衡满足需求。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年07月01日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档