背景
蓄水池采样算法,用以解决在不确定数据量情况下的采样,比如数据流随机采样K个值,并且每一个值的采样概率需要相同,乍一听好神奇,怎么证明这个相同的概率是理论正确的呢?
证明过程
先附上一个自己的代码实现:
func reservoirSampling(sample int, total int) ([]int, int) {
r1 := make([]int, total)
r2 := 0
for i := 0; i < total; i++ {
r1[i] = i
}
for i := sample; i < total; i++ {
tmp := rand.Intn(i)
if tmp < sample {
r1[i], r1[tmp] = tmp, i
}
}
return r1[:sample], r2
}
这里并没有体现是一个数据流,但没关系,这个不影响采样概率的证明。证明的过程,如果是理解了问题的本质,其实比较简单,分两种情况讨论,假设从n个数据中随机采样k个数据:
- 来看第1~k个数据,如果要保留到最后的采样集合中,一开始1~k保留的概率为1,直到k+1个数据出现,对于1~k个数据中任何一个数据来说,被选中换出的概率是1/(k+1),反面就是被保留的概率,就是1-1/(k+1)=k/k+1,紧接着第k+2个数据来了,那么类似的分析法,被保留的概率是(k+1)/(k+2),以此类推,知道第n个数据出现,概率就是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
- 来看第i个数据(i>k),第i个数据被选中的概率,显然是k/i,紧接着第i+1个数据来了,此时前面第i个数据被剔除的概率为1/(i+1),被保留的概率就是1-1/(i+1)=i/(i+1),第i+2个数据来了,类似的分析法,第i个数据被保留的概率是(i+1)/(i+2),那么到第n个数据的时候,被保留的概率是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
完美证明,如论是最开始的1~k个数据,还是k+1~n个数据,选中被保留到最后的概率都是k/n,是不是很神奇?简单的数学,总是令人开心,不需要关心一堆复杂公式的推导,哈哈。
和naive随机采样的对比
其实蓄水池采样算法不仅仅可以用于数据流中随机取k个数据,也可以用在确定的n个数据中取k个不重复数据,写了一个naive的随机取k个数据,还要去重(因为随机取数据过程可能会重复),那么如果采样的数据量和整体比较大,比如3000个数要取1000个出来,蓄水池算法的性能比较好,而且代码实现上更加优雅,因为navie的随机取,需要考虑去重,不仅性能差,代码可读性也较差,如下:
func naiveSampling(sample int, total int) ([]int, int) {
r1 := make([]int, sample)
r2 := 0
m := make(map[int]bool)
for i := 0; i < sample; {
tmp := rand.Intn(total)
if _, ok := m[tmp]; ok {
// 不去重
r2++
// 去重
// continue
}
r1[i] = tmp
m[tmp] = true
i++
}
return r1, r2
}
func randomSequence(sample int, count int, total int, f func(int,int)([]int, int)) {
start := time.Now()
rand.Seed(time.Now().UnixNano())
m := make(map[int]int)
r := 0
for i := 0; i < count; i++ {
r1, r2 := f(sample, total)
r += r2
for _, v := range r1 {
m[v] += 1
}
}
elapsed := time.Since(start)
fmt.Printf("elapsed=%v\nr|t=%d|%d, rr=%f, sample=%d\nm=%v\n",
elapsed, r, sample*count, float32(r)/float32(sample*count), len(m), m)
}
func main() {
fmt.Printf("naiveSampling, sample=500, total=3000\n")
randomSequence(500, 10000, 3000, naiveSampling)
fmt.Printf("reservoirSampling, sample=500, total=3000\n")
randomSequence(500, 10000, 3000, reservoirSampling)
fmt.Printf("naiveSampling, sample=1000, total=3000\n")
randomSequence(1000, 10000, 3000, naiveSampling)
fmt.Printf("reservoirSampling, sample=1000, total=3000\n")
randomSequence(1000, 10000, 3000, reservoirSampling)
}
可以看到naiveSampling先不做去重,只统计重复出现采样的数字出现次数,方便计算rr(redundant rate,数据重复率,也就是重复出现的数据),那么通过两组参数可以看到,在3000个数据里面sample 500个数据,重复率达到7.9%,而3000个数字里面sample 1000个数据,重复率骤增到15%(基本成线性增长),具体结果如下:
naiveSampling, sample=500, total=3000
elapsed=759.6145ms
r|t=394722|5000000, rr=0.078944, sample=3000
m=map[0:1730 1:1697 2:1624 3:1658 4:1664 5:1673 6:1650 7:1672 8:1679 9:1678 10:1668 11:1652 12:1630 13:1661 14:1682 ……]
reservoirSampling, sample=500, total=3000
elapsed=835.8041ms
r|t=0|5000000, rr=0.000000, sample=3000
m=map[0:1585 1:1569 2:1618 3:1682 4:1631 5:1549 6:1646 7:1721 8:1709 9:1610 10:1699 11:1657 12:1645 13:1692 14:1684 ……]
naiveSampling, sample=1000, total=3000
elapsed=1.4001243s
r|t=1494500|10000000, rr=0.149450, sample=3000
m=map[0:3286 1:3222 2:3372 3:3338 4:3302 5:3365 6:3303 7:3403 8:3419 9:3299 10:3315 11:3254 12:3454 13:3362 14:3317 ……]
reservoirSampling, sample=1000, total=3000
elapsed=894.9584ms
r|t=0|10000000, rr=0.000000, sample=3000
m=map[0:3249 1:3393 2:3280 3:3322 4:3276 5:3417 6:3217 7:3345 8:3305 9:3444 10:3299 11:3365 12:3411 13:3276 14:3339 ……]
当然,此时蓄水池算法重复率肯定为0,但是性能会比不去重的naive算法差(去掉map计算去重部分,相对来说,naive性能更好),但是一旦开启了去重,当采样的数据个数达到一定比例,比如总数3000,采样1000个数据时,蓄水池采样算法也不会比naive的更差,有兴趣的可以自己修改下代码,运行得到时间统计。
总结
蓄水池采样算法妙在可以保证每个采样数据的被最终选中的等概率性,而且数学证明上很简洁,通常在数据量位置情况下使用,扩展想,也可以使用在数据量确定情况下,选择k个不重复数据,再扩展些看,如果总数特别大,蓄水池采样算法也可以实现分布式版本,解决单机性能的瓶颈(算力或者存储资源),在此不在赘述,也很简单。过程大致可以为:
- 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, ..., NK(假设m<Nk)。N1+N2+...+NK=N。
- 取[1, N]一个随机数d,若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;以此类推,重复m次,则最终从N大数据集中选出m个数据。
参考资料
蓄水池抽样算法(Reservoir Sampling)
蓄水池采样算法