布隆过滤器在之前的从 hashtable 到 bloomfilter 讲过部分关于他的计算以及一些参数,今天就简单实现一个 bloomfilter ,当然实现过程也参照了别人的代码和结构设计,让我自己从头凭空创造一个也不可能,有些类同的情况敬请谅解。
在真正实现之前,我们先来看看我们需要布隆过滤器实现的一些功能,首先我们使用的时候就是初始化一个 bloomfilter 这样的数据结构体,然后向其中插入数据来判断我们做的到底插入数据之前是否插入过。最后结束的时候将其删除内存释放,还有一个可能就是我们想要对当前布隆过滤器状态进行保存,那么就需要持久化,持久化必然包括两个方面那就是文件加载和文件保存。
以上就是我们的一些使用的场景,基于这样的场景,我们要去设计数据结构体,那么现在上代码。
#define BYTE_BITS (8)
typedef struct
{
uint8_t cInitFlag; // 初始化标志,为0时的第一次Add()会对stFilter[]做初始化
uint8_t cResv[3];
uint32_t dwMaxItems; // n - BloomFilter中最大元素个数 (输入量)
double dProbFalse; // p - 假阳概率(误判率) (输入量,比如万分之一:0.00001)
uint32_t dwFilterBits; // m = ; - BloomFilter的比特数
uint32_t dwHashFuncs; // k = round(log(2.0) * m / n); - 哈希函数个数
uint32_t dwSeed; // MurmurHash的种子偏移量
uint32_t dwCount; // Add()的计数,超过MAX_BLOOMFILTER_N则返回失败
uint32_t dwFilterSize; // dwFilterBits / BYTE_BITS
unsigned char *pstFilter; // BloomFilter存储指针,使用malloc分配
uint32_t *pdwHashPos; // 存储上次hash得到的K个bit位置数组(由bloom_hash填充)
} BaseBloomFilter;
上述是一个较为基础的布隆过滤器数据结构,上边链接文章中算了 布隆过滤器中的一些参数情况,在数据结构中进行定义,不明白其中意思的可以看点击上边的超链接看一下最后布隆过滤器的介绍。
因为其中有些参数需要计算,所以在初始化之前,我们先定义一个参数计算的函数。
static inline void _CalcBloomFilterParam(uint32_t n, double p, uint32_t *pm, uint32_t *pk)
{
/**
* n - 计划向布隆过滤器插入的元素个数
* p - 假阳率,大概意思就是插入两个不同元素产生同样结果的概率
* m - 布隆过滤器要使用多少位
* k - 哈希函数的个数
*
//定义四个参数之间关系
* f = ln(2) × ln(1/2) × m / n = (0.6185) ^ (m/n) 具体推导方法可以看上边链接文章
* k = ln(2) × m / n = 0.6931 * m / n
* m = -1*n*ln(p)/0.4804530139182079271955440025
* k = ln(2)*m/n
**/
uint32_t m, k, m2;
// 计算指定假阳(误差)概率下需要的比特数
m =(uint32_t) ceil(-1.0 * n * log(p) / 0.480453);
m = (m - m % 64) + 64; // 8字节对齐,布隆过滤器使用的字节数需要是 8 字节整数倍
// 计算哈希函数个数
double double_k = (0.69314 * m / n); // ln(2)*m/n // 这里只是为了debug出来看看具体的浮点数值
k = round(double_k); // 返回x的四舍五入整数值。
printf("orig_k:%lf, k:%u\n", double_k, k);
*pm = m;
*pk = k;
return;
}
这个函数是布隆过滤器的一些指标的计算,他通过使用者传入的大概需要插入的元素个数和能接受的冲突概率计算出来布隆过滤器需要的字节数和应该使用的哈希函数个数。
因为后续函数都希望是内联函数,所以加了 inline, 内联函数就是编译器在编译的时候将代码直接拷贝到使用的地方,虽然占用内存但是性能相对较好。
然后就是数据结构的初始化了。
inline int InitBloomFilter(BaseBloomFilter *pstBloomfilter,
uint32_t dwSeed,
uint32_t dwMaxItems, double dProbFalse)
{
//参数检查
if (pstBloomfilter == NULL)
return -1;
if ((dProbFalse <= 0) || (dProbFalse >= 1))
return -2;
// 检查是否重复初始化了,释放内存
if (pstBloomfilter->pstFilter != NULL)
free(pstBloomfilter->pstFilter);
if (pstBloomfilter->pdwHashPos != NULL)
free(pstBloomfilter->pdwHashPos);
//分配内存然后初始化为 0
memset(pstBloomfilter, 0, sizeof(BaseBloomFilter));
// 初始化内存结构,并计算BloomFilter需要的空间
pstBloomfilter->dwMaxItems = dwMaxItems; // 最大存储 元素
pstBloomfilter->dProbFalse = dProbFalse; // 误差概率
pstBloomfilter->dwSeed = dwSeed; // hash种子
// 计算 m, k 然后存入数据结构中
_CalcBloomFilterParam(pstBloomfilter->dwMaxItems, pstBloomfilter->dProbFalse,
&pstBloomfilter->dwFilterBits, &pstBloomfilter->dwHashFuncs);
// 分配BloomFilter的存储空间 这个是布隆过滤器的大小
pstBloomfilter->dwFilterSize = pstBloomfilter->dwFilterBits / BYTE_BITS;
//分配这样位的 bits
pstBloomfilter->pstFilter = (unsigned char *) malloc(pstBloomfilter->dwFilterSize);
if (NULL == pstBloomfilter->pstFilter)
return -100;
// 哈希函数个数 * 4 字节 每四个字节保存一个数字,表示位数
pstBloomfilter->pdwHashPos = (uint32_t*) malloc(pstBloomfilter->dwHashFuncs * sizeof(uint32_t));
if (NULL == pstBloomfilter->pdwHashPos)
return -200;
printf(">>> Init BloomFilter(n=%u, p=%e, m=%u, k=%d), malloc() size=%.6fMB, items:bits=1:%0.1lf\n",
pstBloomfilter->dwMaxItems, pstBloomfilter->dProbFalse, pstBloomfilter->dwFilterBits,
pstBloomfilter->dwHashFuncs, (double)pstBloomfilter->dwFilterSize/1024/1024,
pstBloomfilter->dwFilterBits*1.0/pstBloomfilter->dwMaxItems);
// 初始化BloomFilter的内存
memset(pstBloomfilter->pstFilter, 0, pstBloomfilter->dwFilterSize);
//表示已经初始化
pstBloomfilter->cInitFlag = 1;
return 0;
}
上述初始化过程还比较简单,然后就是添加,不过添加元素之前,我们需要定义一下内容,首先就是 hash 函数,我们需要通过哈希函数来插入,另外我们需要的是 k 个函数,这么多函数我们不能一个定义,毕竟 k 是个变量,我们需要批量生产这样的 hash 函数。
我们这里用的是 64 位的 MurmurHash2 (想要查看可能需要科学上网) ,这里为什么要选用这个简单讲一下,我们选用哈希函数几个标准,首先就是可以批量,这个通过多次 hash 基本都可以实现,另外就是快速,而这一点就很少了,而这个 MurmurHash2 就是一个快速的非加密的哈希函数。
算法如下:
uint64_t MurmurHash2_x64 ( const void * key, int len, uint32_t seed )
{
const uint64_t m = 0xc6a4a7935bd1e995;
const int r = 47;
uint64_t h = seed ^ (len * m); //seed 的用处
const uint64_t * data = (const uint64_t *)key;
const uint64_t * end = data + (len/8);
while(data != end)
{
uint64_t k = *data++;
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
}
const uint8_t * data2 = (const uint8_t*)data;
switch(len & 7)
{
case 7: h ^= ((uint64_t)data2[6]) << 48;
case 6: h ^= ((uint64_t)data2[5]) << 40;
case 5: h ^= ((uint64_t)data2[4]) << 32;
case 4: h ^= ((uint64_t)data2[3]) << 24;
case 3: h ^= ((uint64_t)data2[2]) << 16;
case 2: h ^= ((uint64_t)data2[1]) << 8;
case 1: h ^= ((uint64_t)data2[0]);
h *= m;
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
具体算法可以不那么细究,只要会用就行了。
然后就是使用 k 个哈希函数生成不同 hash 函数,这里使用了双重散列封装。
//将一个 64 位数据数据前 32 位和后 32 位进行与操作然后得到 32 位数据.
#define MIX_UINT64(v) ((uint32_t)((v>>32)^(v)))
void bloom_hash(BaseBloomFilter *pstBloomfilter, const void * key, int len)
{
//检查
if (pstBloomfilter == NULL) return;
int i;
//总共的比特数
uint32_t dwFilterBits = pstBloomfilter->dwFilterBits;
// 第一次 hash
uint64_t hash1 = MurmurHash2_x64(key, len, pstBloomfilter->dwSeed);
//将第一次hash结构转换后进行得到第二次哈希
uint64_t hash2 = MurmurHash2_x64(key, len, MIX_UINT64(hash1));
for (i = 0; i < (int)pstBloomfilter->dwHashFuncs; i++)
{
pstBloomfilter->pdwHashPos[i] = (hash1 + i*hash2) % dwFilterBits; //通过两个hash 计算k 个函数的位数
}
return;
}
定义好这两个函数后就可以定义添加一个元素的内容了。
#define SETBIT(filter, n) (filter->pstFilter[n/BYTE_BITS] |= (1 << (n%BYTE_BITS)))
int BloomFilter_Add(BaseBloomFilter *pstBloomfilter, const void * key, int len)
{
//参数判断
if ((pstBloomfilter == NULL) || (key == NULL) || (len <= 0))
return -1;
int i;
//先判断有没有进行初始化
if (pstBloomfilter->cInitFlag != 1)
{
// Reset后没有初始化,使用前需要memset
memset(pstBloomfilter->pstFilter, 0, pstBloomfilter->dwFilterSize);
pstBloomfilter->cInitFlag = 1;
}
// hash key到bloomfilter中, 为了计算不同hash命中的位置,保存pdwHashPos数组
bloom_hash(pstBloomfilter, key, len);
for (i = 0; i < (int)pstBloomfilter->dwHashFuncs; i++)
{
//宏定义中见,将得到的余数的位置为 1
SETBIT(pstBloomfilter, pstBloomfilter->pdwHashPos[i]);
}
// 增加count数
pstBloomfilter->dwCount++;
if (pstBloomfilter->dwCount <= pstBloomfilter->dwMaxItems)
return 0;
else
return 1; // 超过N最大值,可能出现准确率下降等情况
}
然后我们需要一个检查元素是否存在的函数。
#define GETBIT(filter, n) (filter->pstFilter[n/BYTE_BITS] & (1 << (n%BYTE_BITS)))
int BloomFilter_Check(BaseBloomFilter *pstBloomfilter, const void * key, int len)
{
//参数检查
if ((pstBloomfilter == NULL) || (key == NULL) || (len <= 0))
return -1;
int i;
bloom_hash(pstBloomfilter, key, len);
for (i = 0; i < (int)pstBloomfilter->dwHashFuncs; i++)
{
// 如果有任意bit不为1,说明key不在bloomfilter中
// 注意: GETBIT()返回不是0|1,高位可能出现128之类的情况
if (GETBIT(pstBloomfilter, pstBloomfilter->pdwHashPos[i]) == 0)
return 1;
}
return 0;
}
上述的操作基本完成,然后就是数据结构的释放和重置了。
int FreeBloomFilter(BaseBloomFilter *pstBloomfilter)
{
if (pstBloomfilter == NULL)
return -1;
pstBloomfilter->cInitFlag = 0;
pstBloomfilter->dwCount = 0;
free(pstBloomfilter->pstFilter);
pstBloomfilter->pstFilter = NULL;
free(pstBloomfilter->pdwHashPos);
pstBloomfilter->pdwHashPos = NULL;
return 0;
}
释放较为简单,理解也简单就不做过多介绍。
然后就是数据重置,有两种实现,一种就是简单的将 flag 标记置为 0 ,当重新使用的时候会自动初始化。
int ResetBloomFilter(BaseBloomFilter *pstBloomfilter)
{
if (pstBloomfilter == NULL)
return -1;
pstBloomfilter->cInitFlag = 0;
pstBloomfilter->dwCount = 0; //元素计数
return 0;
}
另外就是真正重置,将每个位都置为 0
int RealResetBloomFilter(BaseBloomFilter *pstBloomfilter)
{
if (pstBloomfilter == NULL)
return -1;
memset(pstBloomfilter->pstFilter, 0, pstBloomfilter->dwFilterSize); //将记录内存置为 0;
pstBloomfilter->cInitFlag = 1; //此时已经初始化了
pstBloomfilter->dwCount = 0;
return 0;
}
持久化问题本质就是我们需要,将一个数据结构的保存成某种统一格式,我们在网络传输中会将对象序列化然后传输,跟这方面意思雷同,但是 bloomfilter 涉及位数组的保存,所以讲起保存成某种特定结构的二进制最好。所以我们可以自己定义一个文件的数据结构保存。
文件头的数据结构定义。
// BloomFilter文件头部定义
typedef struct
{
uint32_t dwMagicCode; // 文件头部标识,这个随便自己定义
uint32_t dwSeed;
uint32_t dwCount;
uint32_t dwMaxItems; // n - BloomFilter中最大元素个数 (输入量)
double dProbFalse; // p - 假阳概率 (输入量,比如万分之一:0.00001)
uint32_t dwFilterBits; // m = ceil((n * log(p)) / log(1.0 / (pow(2.0, log(2.0))))); - BloomFilter的比特数
uint32_t dwHashFuncs; // k = round(log(2.0) * m / n); - 哈希函数个数
uint32_t dwResv[6];
uint32_t dwFileCrc; // (未使用)整个文件的校验和
uint32_t dwFilterSize; // 后面Filter的Buffer长度
} BloomFileHead;
这个结构很多内容跟 bloomfilter 内容一致。
持久化函数如下:
#define __BLOOMFILTER_VERSION__ "1.1"
#define __MGAIC_CODE__ (0x01464C42)
int SaveBloomFilterToFile(BaseBloomFilter *pstBloomfilter, char *szFileName)
{
//检查
if ((pstBloomfilter == NULL) || (szFileName == NULL))
return -1;
int iRet;
FILE *pFile;
static BloomFileHead stFileHeader = {0};
//二进制格式打开文件
pFile = fopen(szFileName, "wb");
if (pFile == NULL)
{
perror("fopen");
return -11;
}
// 先写入文件头
stFileHeader.dwMagicCode = __MGAIC_CODE__;
stFileHeader.dwSeed = pstBloomfilter->dwSeed;
stFileHeader.dwCount = pstBloomfilter->dwCount;
stFileHeader.dwMaxItems = pstBloomfilter->dwMaxItems;
stFileHeader.dProbFalse = pstBloomfilter->dProbFalse;
stFileHeader.dwFilterBits = pstBloomfilter->dwFilterBits;
stFileHeader.dwHashFuncs = pstBloomfilter->dwHashFuncs;
stFileHeader.dwFilterSize = pstBloomfilter->dwFilterSize;
//写入文件头
iRet = fwrite((const void*)&stFileHeader, sizeof(stFileHeader), 1, pFile);
if (iRet != 1)
{
perror("fwrite(head)");
return -21;
}
// 写入BloomFilter的内容
iRet = fwrite(pstBloomfilter->pstFilter, 1, pstBloomfilter->dwFilterSize, pFile);
if ((uint32_t)iRet != pstBloomfilter->dwFilterSize)
{
perror("fwrite(data)");
return -31;
}
fclose(pFile);
return 0;
}
上述内容较为简单,然后就是加载文件。
int LoadBloomFilterFromFile(BaseBloomFilter *pstBloomfilter, char *szFileName)
{
//判断参数
if ((pstBloomfilter == NULL) || (szFileName == NULL))
return -1;
int iRet;
FILE *pFile;
static BloomFileHead stFileHeader = {0};
if (pstBloomfilter->pstFilter != NULL)
free(pstBloomfilter->pstFilter);
if (pstBloomfilter->pdwHashPos != NULL)
free(pstBloomfilter->pdwHashPos);
//打开文件
pFile = fopen(szFileName, "rb");
if (pFile == NULL)
{
perror("fopen");
return -11;
}
// 读取并检查文件头
iRet = fread((void*)&stFileHeader, sizeof(stFileHeader), 1, pFile);
if (iRet != 1)
{
perror("fread(head)");
return -21;
}
if ((stFileHeader.dwMagicCode != __MGAIC_CODE__)
|| (stFileHeader.dwFilterBits != stFileHeader.dwFilterSize*BYTE_BITS))
return -50;
// 初始化传入的 BaseBloomFilter 结构
pstBloomfilter->dwMaxItems = stFileHeader.dwMaxItems;
pstBloomfilter->dProbFalse = stFileHeader.dProbFalse;
pstBloomfilter->dwFilterBits = stFileHeader.dwFilterBits;
pstBloomfilter->dwHashFuncs = stFileHeader.dwHashFuncs;
pstBloomfilter->dwSeed = stFileHeader.dwSeed;
pstBloomfilter->dwCount = stFileHeader.dwCount;
pstBloomfilter->dwFilterSize = stFileHeader.dwFilterSize;
pstBloomfilter->pstFilter = (unsigned char *) malloc(pstBloomfilter->dwFilterSize);
if (NULL == pstBloomfilter->pstFilter)
return -100;
pstBloomfilter->pdwHashPos = (uint32_t*) malloc(pstBloomfilter->dwHashFuncs * sizeof(uint32_t));
if (NULL == pstBloomfilter->pdwHashPos)
return -200;
// 将后面的Data部分读入 pstFilter
iRet = fread((void*)(pstBloomfilter->pstFilter), 1, pstBloomfilter->dwFilterSize, pFile);
if ((uint32_t)iRet != pstBloomfilter->dwFilterSize)
{
perror("fread(data)");
return -31;
}
pstBloomfilter->cInitFlag = 1;
printf(">>> Load BloomFilter(n=%u, p=%f, m=%u, k=%d), malloc() size=%.2fMB\n",
pstBloomfilter->dwMaxItems, pstBloomfilter->dProbFalse, pstBloomfilter->dwFilterBits,
pstBloomfilter->dwHashFuncs, (double)pstBloomfilter->dwFilterSize/1024/1024);
fclose(pFile);
return 0;
}
读取内容也较为简单,就是先将文件读入文件头数据结构,然后将文件内容进行读取赋值。
到此,bloomfilter 的所有实现都基本已经实现。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。