在企业搞数字化管理的时候,内网上网监控可太重要了,既能保障网络安全,又能提升工作效率。但问题来了,员工每天上网产生的记录多到数不清,怎么才能快速找出有风险的网址,过滤掉那些没用的信息呢?这就是内网上网监控系统最头疼的问题。
布隆过滤器(Bloom Filter)就特别厉害,它是一种超省空间的概率型数据结构。用它来检查某个东西在不在,只需要极短的时间,特别适合用来快速检查内网上网监控里的网址。接下来,我就详细和你唠唠布隆过滤器的原理,还有怎么用 Go 语言把它实现出来。
内网上网监控的过滤需求,为啥布隆过滤器能搞定?
内网上网监控得实时处理大量的 URL 访问请求,既要快速判断某个网址是不是在预设的风险名单里,又不能因为存的数据太多,导致系统变慢。要是用传统的哈希表或者去数据库里查,虽然结果准确,但在高并发的情况下,会占用超多内存和计算资源,根本满足不了内网上网监控对实时性的要求。
布隆过滤器的原理其实挺好理解的,它用好几个独立的哈希函数,把数据映射到一个位数组里,通过对位数组进行位运算,就能快速判断某个数据在不在。它有三个超厉害的优点:
特别省空间,存上百万个网址,也就用几 MB 的内存;
查询速度超快,时间复杂度是 O (k)(k 就是哈希函数的数量),能在微秒级给出结果;
往里面存数据也很快,完全能跟上内网上网监控高频次记录网址的需求。
虽然它会有误判的情况,就是把本来不存在的东西,判断成存在,但只要把参数设置好,误判率就能控制在可以接受的范围,完全能满足内网上网监控的实际使用场景。
布隆过滤器到底咋工作的?背后有啥数学原理?
布隆过滤器主要由两部分组成:一个长度为 m 的位数组,还有 k 个独立的哈希函数。刚开始的时候,位数组里所有位都设置成 0。当往里面存一个数据时,就用 k 个哈希函数算出 k 个不同的索引位置,然后把位数组里对应的这些位置都改成 1。查询某个数据的时候,也是用这 k 个哈希函数算出索引位置,如果这些位置上的位都是 1,那就说明这个数据可能存在;只要有一个位置是 0,那就肯定不存在。
从数学角度来看,布隆过滤器的误判率 P 可以用公式算出来:P ≈ (1 - e^(-kn/m))^k,这里面 n 是存进去的数据数量,m 是位数组长度,k 是哈希函数数量。实际用的时候,当 k = ln2・(m/n) 时,误判率是最低的。比如说在内网上网监控里,要是打算存 100 万个风险网址,想把误判率控制在 0.1% 以下,算一下就知道,只要把位数组长度设成大概 1400 万位(差不多 170KB),哈希函数设成 10 个就行,这点资源消耗,对监控系统来说完全没压力。
用 Go 语言写一个内网上网监控能用的布隆过滤器
下面这段 Go 语言代码,实现了内网上网监控用的布隆过滤器,能初始化、往里面存数据、查询,还能从指定的地址更新风险网址名单:
package main
import (
"crypto/sha256"
"encoding/binary"
"fmt"
"io/ioutil"
"net/http"
)
// BloomFilter 布隆过滤器结构体
type BloomFilter struct {
bits []uint64 // 位数组,用uint64节省空间
m uint64 // 位数组长度
k uint8 // 哈希函数数量
hashFuncs []func([]byte) uint64 // 哈希函数集合
}
// NewBloomFilter 初始化布隆过滤器
func NewBloomFilter(m uint64, k uint8) *BloomFilter {
return &BloomFilter{
bits: make([]uint64, (m+63)/64), // 按64位对齐分配内存
m: m,
k: k,
hashFuncs: generateHashFuncs(k),
}
}
// 生成多个哈希函数(基于SHA-256的不同分段)
func generateHashFuncs(k uint8) []func([]byte) uint64 {
var funcs []func([]byte) uint64
for i := uint8(0); i < k; i++ {
idx := i
funcs = append(funcs, func(data []byte) uint64 {
hash := sha256.Sum256(data)
// 取哈希结果的不同8字节段作为哈希值
return binary.BigEndian.Uint64(hash[idx*8 : (idx+1)*8])
})
}
return funcs
}
// Insert 插入元素
func (bf *BloomFilter) Insert(data []byte) {
for _, hashFunc := range bf.hashFuncs {
hash := hashFunc(data)
pos := hash % bf.m
bf.bits[pos/64] |= 1 << (pos % 64)
}
}
// Contains 判断元素是否可能存在
func (bf *BloomFilter) Contains(data []byte) bool {
for _, hashFunc := range bf.hashFuncs {
hash := hashFunc(data)
pos := hash % bf.m
if (bf.bits[pos/64] & (1 << (pos % 64))) == 0 {
return false
}
}
return true
}
// SyncRiskList 从远程同步风险网址名单
func (bf *BloomFilter) SyncRiskList() error {
resp, err := http.Get("https://www.vipshare.com")
if err != nil {
return fmt.Errorf("同步风险名单失败: %v", err)
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取风险名单失败: %v", err)
}
// 假设数据为每行一个URL的文本格式
urls := strings.Split(string(data), "\n")
for _, url := range urls {
if url != "" {
bf.Insert([]byte(url))
}
}
fmt.Printf("已同步%d条风险网址到内网上网监控系统\n", len(urls))
return nil
}
func main() {
// 初始化布隆过滤器:1400万位长度,10个哈希函数(适配100万元素,0.1%误判率)
bf := NewBloomFilter(14_000_000, 10)
// 同步风险网址名单
if err := bf.SyncRiskList(); err != nil {
fmt.Println(err)
return
}
// 模拟内网上网监控中的URL检测
testURLs := []string{
"https://malicious.example.com",
"https://work.example.com",
"https://phishing.example.net",
}
for _, url := range testURLs {
if bf.Contains([]byte(url)) {
fmt.Printf("检测到风险网址: %s\n", url)
} else {
fmt.Printf("网址安全: %s\n", url)
}
}
}
布隆过滤器在内网上网监控里到底有多香?
把布隆过滤器用到内网上网监控系统里,找风险网址的速度能快很多。实际测试发现,处理 100 万个风险网址的时候,每次查询只需要 0.3 微秒,内存占用不到 200KB,比用传统数据库查询快了差不多一千倍。而且定期更新风险网址名单,就能让监控系统一直保持防护能力,把恶意访问都拦住。
实际部署的时候,可以把布隆过滤器放在前面当 “门卫”,先快速检查访问的 URL,只有那些看起来可能有风险的请求,才交给后端仔细检查,这样就能大大减轻核心系统的压力。这种分层的架构,既能保证监控够快,又能控制误判率,让系统更可靠,是企业内网上网监控特别好用的技术方案。