2021年6月

背景

蓄水池采样算法,用以解决在不确定数据量情况下的采样,比如数据流随机采样K个值,并且每一个值的采样概率需要相同,乍一听好神奇,怎么证明这个相同的概率是理论正确的呢?

证明过程

先附上一个自己的代码实现:

func reservoirSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, total)
    r2 := 0

    for i := 0; i < total; i++ {
        r1[i] = i
    }

    for i := sample; i < total; i++  {
        tmp := rand.Intn(i)
        if tmp < sample {
            r1[i], r1[tmp] = tmp, i
        }
    }

    return r1[:sample], r2
}

这里并没有体现是一个数据流,但没关系,这个不影响采样概率的证明。证明的过程,如果是理解了问题的本质,其实比较简单,分两种情况讨论,假设从n个数据中随机采样k个数据:

  1. 来看第1~k个数据,如果要保留到最后的采样集合中,一开始1~k保留的概率为1,直到k+1个数据出现,对于1~k个数据中任何一个数据来说,被选中换出的概率是1/(k+1),反面就是被保留的概率,就是1-1/(k+1)=k/k+1,紧接着第k+2个数据来了,那么类似的分析法,被保留的概率是(k+1)/(k+2),以此类推,知道第n个数据出现,概率就是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
  2. 来看第i个数据(i>k),第i个数据被选中的概率,显然是k/i,紧接着第i+1个数据来了,此时前面第i个数据被剔除的概率为1/(i+1),被保留的概率就是1-1/(i+1)=i/(i+1),第i+2个数据来了,类似的分析法,第i个数据被保留的概率是(i+1)/(i+2),那么到第n个数据的时候,被保留的概率是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
    完美证明,如论是最开始的1~k个数据,还是k+1~n个数据,选中被保留到最后的概率都是k/n,是不是很神奇?简单的数学,总是令人开心,不需要关心一堆复杂公式的推导,哈哈。

和naive随机采样的对比

其实蓄水池采样算法不仅仅可以用于数据流中随机取k个数据,也可以用在确定的n个数据中取k个不重复数据,写了一个naive的随机取k个数据,还要去重(因为随机取数据过程可能会重复),那么如果采样的数据量和整体比较大,比如3000个数要取1000个出来,蓄水池算法的性能比较好,而且代码实现上更加优雅,因为navie的随机取,需要考虑去重,不仅性能差,代码可读性也较差,如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            // 不去重
            r2++ 
            // 去重
            // continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func randomSequence(sample int, count int, total int, f func(int,int)([]int, int)) {
    start := time.Now()
    rand.Seed(time.Now().UnixNano())

    m := make(map[int]int)
    r := 0
    for i := 0; i < count; i++ {
        r1, r2 := f(sample, total)
        r += r2
        for _, v := range r1 {
            m[v] += 1
        }
    }
    elapsed := time.Since(start)
    fmt.Printf("elapsed=%v\nr|t=%d|%d, rr=%f, sample=%d\nm=%v\n",
        elapsed, r, sample*count, float32(r)/float32(sample*count), len(m), m)
}

func main() {
    fmt.Printf("naiveSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, reservoirSampling)
    
    fmt.Printf("naiveSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, reservoirSampling)
}

可以看到naiveSampling先不做去重,只统计重复出现采样的数字出现次数,方便计算rr(redundant rate,数据重复率,也就是重复出现的数据),那么通过两组参数可以看到,在3000个数据里面sample 500个数据,重复率达到7.9%,而3000个数字里面sample 1000个数据,重复率骤增到15%(基本成线性增长),具体结果如下:

naiveSampling, sample=500, total=3000
elapsed=759.6145ms
r|t=394722|5000000, rr=0.078944, sample=3000
m=map[0:1730 1:1697 2:1624 3:1658 4:1664 5:1673 6:1650 7:1672 8:1679 9:1678 10:1668 11:1652 12:1630 13:1661 14:1682 ……]
reservoirSampling, sample=500, total=3000
elapsed=835.8041ms
r|t=0|5000000, rr=0.000000, sample=3000
m=map[0:1585 1:1569 2:1618 3:1682 4:1631 5:1549 6:1646 7:1721 8:1709 9:1610 10:1699 11:1657 12:1645 13:1692 14:1684 ……]
naiveSampling, sample=1000, total=3000
elapsed=1.4001243s
r|t=1494500|10000000, rr=0.149450, sample=3000
m=map[0:3286 1:3222 2:3372 3:3338 4:3302 5:3365 6:3303 7:3403 8:3419 9:3299 10:3315 11:3254 12:3454 13:3362 14:3317 ……]
reservoirSampling, sample=1000, total=3000
elapsed=894.9584ms
r|t=0|10000000, rr=0.000000, sample=3000
m=map[0:3249 1:3393 2:3280 3:3322 4:3276 5:3417 6:3217 7:3345 8:3305 9:3444 10:3299 11:3365 12:3411 13:3276 14:3339 ……]

当然,此时蓄水池算法重复率肯定为0,但是性能会比不去重的naive算法差(去掉map计算去重部分,相对来说,naive性能更好),但是一旦开启了去重,当采样的数据个数达到一定比例,比如总数3000,采样1000个数据时,蓄水池采样算法也不会比naive的更差,有兴趣的可以自己修改下代码,运行得到时间统计。

总结

蓄水池采样算法妙在可以保证每个采样数据的被最终选中的等概率性,而且数学证明上很简洁,通常在数据量位置情况下使用,扩展想,也可以使用在数据量确定情况下,选择k个不重复数据,再扩展些看,如果总数特别大,蓄水池采样算法也可以实现分布式版本,解决单机性能的瓶颈(算力或者存储资源),在此不在赘述,也很简单。过程大致可以为:

  1. 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, ..., NK(假设m<Nk)。N1+N2+...+NK=N。
  2. 取[1, N]一个随机数d,若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;以此类推,重复m次,则最终从N大数据集中选出m个数据。

参考资料

蓄水池抽样算法(Reservoir Sampling)
蓄水池采样算法

节前看了一篇关于Timing Attach的技术博文,还挺有感触的,原文在《这10行比较字符串相等的代码给我整懵了,不信你也来看看》,写得挺好的,自己也想借此整理下,记录下来。这种技术,特别像是灵感,就是你看完之后,会醍醐灌顶的快乐感受。

另类的字符串比较

在 Java 的 Play Framework 里有一段代码用来验证cookie(session)中的数据是否合法(包含签名的验证)的代码,如下所示:

boolean safeEqual(String a, String b) {
   if (a.length() != b.length()) {
       return false;
   }
   int equal = 0;
   for (int i = 0; i < a.length(); i++) {
       equal |= a.charAt(i) ^ b.charAt(i);
   }
   return equal == 0;
}

相信刚看到这段源码的人会感觉挺奇怪的,这个函数的功能是比较两个字符串是否相等,如果要判断两个字符串是否相等,正常人的写法应该是下面这个样子的(golang版本):

func equals(a, b string) bool {
    if len(a) != len(b) {
        return false
    }

    for i := 0; i < len(a); i++ {
        if a[i] != b[i] {
            return false
        }
    }

    return true
}

我们可以看到,在比较两个字符串是否相等的正常写法是:

先看一下两个字符串长度是否相等,如果不等直接返回 false。
如果长度相等,则依次判断每个字符是否相等,如果不等则返回 false。
如果全部相等,则返回 true。一旦遇到不一样的字符时,直接返回false。
然而,Play Framework里的代码却不是这样的,尤其是上述的第2点,用到了异或,熟悉位操作的你很容易就能看懂,通过异或操作 1^1=0 , 1^0=1, 0^0=0,来比较每一位,如果每一位都相等的话,两个字符串肯定相等,最后存储累计异或值的变量 equal必定为 0(因为相同的字符必然为偶数),否则为 1。

但是,这种异或的方式不是遇到第一个不一样的字符就返回 false 了,而是要做全量比较,这种比较完全没有效率,这是为什么呢?原因是为了安全。

计时攻击(Timing Attack)

计时攻击(Wikipedia)是[旁道攻击]3 的一种,旁通道攻击是指基于从计算机系统的实现中获得的信息的任何攻击 ,而不是基于实现的算法本身的弱点(例如,密码分析和软件错误)。时间信息,功耗,电磁泄漏甚至声音可以提供额外的信息来源,可以加以利用。在很多物理隔绝的环境中(黑盒),往往也能出奇制胜,这类新型攻击的有效性远高于传统的密码分析的数学方法。(注:企图通过社会工程学欺骗或强迫具有合法访问权限的人来破坏密码系统通常不被视为旁道攻击)

计时攻击是最常用的攻击方法。那么,正常的字符串比较是怎么被黑客进行时间攻击的呢?

我们知道,正常的字符串比较,一旦遇到每一个不同的字符就返回失败了,所以,理论上来说,前面只有2个字符相同字符串比较的耗时,要比前面有10个字符相同的比较要短。你会说,这能相差多少呢?可能几微秒吧。但是,我们可以放大这个事。比如,在Web应用时,记录每个请求的返回所需请求时间(一般是毫秒级),如果我们重复50次,我们可以查看平均时间或是p50的时间,以了解哪个字符返回的时间比较长,如果某个我们要尝试的字符串的时间比较长,我们就可以确定地得出这个这字符串的前面一段必然是正确的。(当然,你会说网络请求的燥音太多了,在毫秒级的请求上完全没办判断,这个需要用到统计学来降噪,后面会给出方法)

这个事情,可以用来做HMAC的攻击,所谓HMAC,你可以参看本站的《HTTP API 认证授权术》文章了解更多的细节。简单来说,HMAC,就是客户端向服务端发来一个字符串和其签名字符串(HMAC),然后,服务端的程序用一个私钥来对客户端发来的字符串进行签名得到签名字符串,然后再比较这个签名字符串(所谓签名,也就是使用MD5或SHA这样的哈希算法进行编码,是不可逆的)

写成伪代码大概是这个样子:

bool verify(message, digest) {
    my_digest = HMAC(key, message);
    return my_digest.equals(digest) ;
}

举个例子,如果有一个签名有40个长度,如:f5acdffbf0bb39b2cdf59ccc19625015b33f55fe,攻击者从0000000000000000000000000000000000000000开始穷举,下面是穷举第一个字符(从0到f因为这是HMAC算法的取值范围)的时间统计。

0 0.005450913
1 0.005829198
2 0.004905407
3 0.005286876
4 0.005597611
5 0.004814430
6 0.004969118
7 0.005335884
8 0.004433182
9 0.004440246
a 0.004860263
b 0.004561121
c 0.004463188
d 0.004406799
e 0.004978907
f 0.004887240

可以看到,第一次测试通过的计时结果(以秒为单位),而值“ f”与样品的其余部分之间没有较大的变化量,所有结果看起来都非常接近。换句话说,有很多噪声掩盖了信号。因此,有必要进行多个采样(对测试进行缩放)并使用统计工具从噪声中滤除信号。为了将信号与噪声分开,我们必须按任意常数对测试进行缩放。通过实验,作者发现500是一个很好的数字。换句话说:运行测试500次,并记录500个试验中每个试验的结果。然后,通过人的肉眼观察可以可能看到 f 的调用明显比别的要长,但是这种方法很难自动化。

所以,作者给了另一个统计算法,这个算法向服务器分别从 0 到 f 发出16个请求,并记录每个请求的响应时间,并将它们排序为1-16,其中1是最长(最慢)的请求,而16是最短(最快的请求),分别记录 0 – f 的名次,然后重复上述的过程 500 次。如下所示(仅显示25个样本,字符“ 0”首先被排名7、1、3,然后再次排名3……):

{
"0"=>[7, 1, 3, 3, 15, 5, 4, 9, 15, 10, 13, 2, 14, 9, 4, 14, 7, 9, 15, 2, 14, 9, 14, 6, 11...],
"1"=>[13, 4, 7, 11, 0, 4, 0, 2, 14, 11, 6, 7, 2, 2, 14, 11, 8, 10, 5, 13, 11, 7, 4, 9, 3...],
"2"=>[14, 5, 15, 5, 1, 0, 3, 1, 9, 12, 4, 4, 1, 1, 8, 6, 9, 4, 9, 5, 8, 3, 12, 8, 5...],
"3"=>[15, 2, 9, 7, 2, 1, 14, 11, 7, 8, 8, 1, 4, 7, 12, 15, 13, 0, 4, 1, 7, 0, 3, 0, 0...],
"4"=>[12, 10, 14, 15, 8, 9, 10, 12, 10, 4, 1, 13, 15, 15, 3, 1, 6, 8, 2, 6, 15, 4, 0, 3, 2...],
"5"=>[5, 13, 13, 12, 7, 8, 13, 14, 3, 13, 2, 12, 7, 14, 2, 10, 12, 5, 8, 0, 4, 10, 5, 10, 12...]
"6"=>[0, 15, 11, 13, 5, 15, 8, 8, 4, 7, 12, 9, 10, 11, 11, 7, 0, 6, 0, 9, 2, 6, 15, 13, 14...]
"7"=>[1, 9, 0, 10, 6, 6, 2, 4, 12, 9, 5, 10, 5, 10, 7, 2, 4, 14, 6, 7, 13, 11, 6, 12, 4...],
"8"=>[4, 0, 2, 1, 9, 11, 12, 13, 11, 14, 0, 15, 9, 0, 0, 13, 11, 13, 1, 8, 6, 5, 11, 15, 7...],
"9"=>[11, 11, 10, 4, 13, 7, 6, 3, 2, 2, 14, 5, 3, 3, 15, 9, 14, 7, 10, 3, 0, 14, 1, 5, 15...],
"a"=>[8, 3, 6, 14, 10, 2, 7, 5, 1, 3, 3, 0, 0, 6, 10, 12, 15, 12, 12, 15, 9, 13, 13, 11, 9...],
"b"=>[9, 12, 5, 8, 3, 3, 5, 15, 0, 6, 11, 11, 12, 8, 1, 3, 1, 11, 11, 14, 5, 1, 2, 1, 6...],
"c"=>[6, 7, 8, 2, 12, 10, 9, 10, 6, 1, 10, 8, 6, 4, 6, 4, 3, 2, 7, 11, 1, 8, 7, 2, 13...],
"d"=>[2, 14, 4, 0, 14, 12, 11, 0, 8, 0, 15, 3, 8, 12, 5, 0, 10, 1, 3, 4, 12, 12, 8, 14, 8...],
"e"=>[10, 8, 12, 6, 11, 13, 1, 6, 13, 5, 7, 14, 11, 5, 9, 5, 2, 15, 14, 10, 10, 2, 10, 4, 1...],
"f"=>[3, 6, 1, 9, 4, 14, 15, 7, 5, 15, 9, 6, 13, 13, 13, 8, 5, 3, 13, 12, 3, 15, 9, 7, 10...]
}

然后将每个字符的500个排名进行平均,得出以下示例输出:

"f", 5.302
"0", 7.17
"6", 7.396
"3", 7.472
"5", 7.562
"a", 7.602
"2", 7.608
"8", 7.626
"9", 7.688
"b", 7.698
"1", 7.704
"e", 7.812
"4", 7.82
"d", 7.826
"7", 7.854
"c", 7.86

于是,f 就这样脱颖而出了。然后,再对剩余的39个字符重复此算法。

这是一种统计技术,可让我们从噪声中滤出真实的信号。因此,总共需要调用:16 500 40 = 320,000个请求,而蛮力穷举需要花费16 ^ 40个请求。

另外,学术界的这篇论文就宣称用这种计时攻击的方法破解了 OpenSSL 0.9.7 的RSA加密算法了。这篇 Remote Timing Attacks are Practical (PDF)论文中指出(我大致翻译下摘要,感兴趣的同学可以通过链接去看原文):

计时攻击往往用于攻击一些性能较弱的计算设备,例如一些智能卡。我们通过实验发现,也能用于攻击普通的软件系统。本文通过实验证明,通过这种计时攻击方式能够攻破一个基于 OpenSSL 的 web 服务器的私钥。结果证明计时攻击用于进行网络攻击在实践中可行的,因此各大安全系统需要抵御这种风险。