导语

记得高中时,老师曾经出过一道题目,1000的999次方和999的1000次方谁大,凭直觉,很多人都说当然是999的1000次方更大,至于为什么,当时很少有同学知道,很多人都这样想的,底数相差1,次方大1的数更大。当年,老师带着大家做了深入的思考和分析,总结出来了规律和原因,结果证明直觉并不可靠。直至最近,我看到一个三进制计算机性能更好的视频,突然把这两个知识串联起来了,再此做一个分享

抽象&分析

当年老师首先要求我们进行题目抽象,1000^999,其实就是有999个1000相乘,这些数的和等于1000*999=999000,而999^1000,就是1000个999相乘,这些数的和也正好是999000,于是,题目就变成了把一个正整数拆成n个正整数的和,是它们的积最大?

那么如何解决抽象出来的问题呢,999000数字太大,不好拆解,那么就使用个小点的数字,比如12,12能怎么拆呢,无非下面几种情况:
12=1+1+1……+1,一共12个1相加,积为1
12=2+2+2……+2,一共6个2相加,积为64
12=3+3+3+3,一共4个3相加,积为81
12=4+4+4,一共3个4相加,积为64
12=5+5+2,积为552,积为50
……
还有其他拆解方法,但是显然,最大的就是拆解为4个3,这时我们就发现了,之前直觉并不对,为什么?因为2^6和3^4相比,次方数2^6比3^4大2,底数小1,但是最后的结果是3^4更大。不管你后面怎么拆,3^4的值最大。仔细看看拆分的方案,规律可以总结为:

  1. 拆一个正整数为n个正整数相乘,尽可能地多拆3出来,不能拆3,就拆2或者4,坚决不能拆1;
  2. 如果拆出来的数比3大的越多,乘积就越小,比如上面的4^3和5%2等比3大的拆分方案;
    知道了这个规律后,就非常容易判断1000^999和999^1000哪个数更大,显然,999比1000更接近3,999^1000更大。那么上面的规律如何证明呢,我记得老师噼里啪啦写了推导,证明过程如下:

N = x + x + …… + x,把大N拆成n个x的和,n=N/x,要使得f(x) = x^n=x^(N/x)最大。
如果要求f(x)最大,lnf(x)也满足最大,两边取对数lnf(x) = lnx^(N/x),即lnf(x) = (N/x)*lnx。
求导数[lnf(x)]' = N * (1-lnx) / x^2,显然当x > e,导数小于0,是减函数,x < e,导数大于0,是增函数,所以x = e时,lnf(x)最大,e = 2.71818,显然最接近e的正整数就是3,所以规律得证。

联想

最近又看到一个视频,讲得是三进制计算机性能最好,我突然联想到这个曾经学过的知识,后来仔细想了下,视频想表达的观点,应该是三进制比二进制或者十进制表达能力更强,相同的内存大小,如果是三进制,可以表达的数最多。小时候都玩过一个游戏,一组珠串,假设给你100颗珠子,怎样分配,可以使得表示的数字数量最多,结果就是下图:
三进制.png
每个柱子上留三颗珠子,可表示3^324个数(计算机里面就是3^324个状态),表达的数量最多。没想到,三进制表达能力最强,在我高中时,老师就把原因告诉了我,哈哈。所以学好数学,培养好逻辑思考能力,真的是终身受用,加油吧,少年。

导语

有了之前两篇关于背包问题文章的积累,《一文搞懂经典0-1背包问题》《背包问题的变形一:无限制背包(Unbounded Knapsack)》,今天讨论的背包问题另一个变形问题,限制背包(Bounded Knapsack)就容易理解得多了。限制背包的意思是每样物品不是只有一件,也不是无限数量,给定一个限制数量,求取装满背包获得的最大价值。那么这个问题利用之前积累的知识很容易解决。背包问题可以由浅入深,而且层层递进,经过这次的学习,我发现迷上了这个经典的NPC问题。

限制背包(Bounded Knapsack Problem)

最容易想到的还是把每件物品按照数量限制(naive),一件件取出,然后更新之前定义的dp数组(见《一文搞懂经典0-1背包问题》),OK,源代码修改下之前无限制背包问题的,也很简单:

func knapsackBounded1(w []int, v []int, m []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        for j := 0; j < m[i]; j++ {
            knapsack01(dp, w[i], v[i])
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

数组m给出的是物品的数量限制。算法时间复杂度O(NW*Sigma(m[i])),空间复杂度O(W)(已经通过滚动数组优化)。

如果要进行优化,自然想到了之前smarter的方式,按2的幂次方去组合出虚拟物品来,这样考虑的物品数量大大降低了,但是恶心的是,不像无限制背包,我们每件物品都可以随便取,直到超过重量上限。这里,我们需要按两种方式考虑,如果m[i]w[i] >= W,OK,那直接使用knapsackComplete更新dp数组即可,如果m[i]w[i] < W,那么我们尝试尽可能拆分虚拟物品,比如m[i] = 19,那么可以拆出的虚拟物品组合为(1, 2, 4, 8)倍原来的物品,剩下19-(1+2+4+8)=4倍物品,我们单独考虑。代码如下,稍微复杂些,仔细看,应该不难理解:

func knapsackBounded2(w []int, v []int, m []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        if m[i] * w[i] >= W {
            knapsackComplete(dp, w[i], v[i])
            continue
        }

        left := m[i]
        for k := 0; k < math.Ilogb(float64(m[i])); k++ {
            knapsack01(dp, w[i] * 1<<k, v[i] * 1<<k)
            left -= 1<<k
        }

        for j := 0; j < left; j++ {
            knapsack01(dp, w[i], v[i])
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

算法时间复杂度O(NW*Sigma(m[i])),空间复杂度O(W)(已经通过滚动数组优化)。

性能测评

参数设置,W=100000,w取值[11, 220],长度10001,v取值[7, 33],m取值[5, 100],测试性能如下:

knapsackUnbounded3=281800, time_cost=778.899197ms
knapsackBounded2=195626, time_cost=16.464975377s
knapsackBounded1=195626, time_cost=1m27.138384055s

看上去这个参数下,基本上算法耗时都很高了,相当于有10w件不同的商品,虚拟一下,knapsackBounded2的商品数量达到了200w左右,然后通过动态规划计算(重复子问题其实没那么高),如果可以大部分进入到m[i] * w[i] >= W,性能会比较好些,试了下面这组参数,W=100000,w取值[1000, 2000],长度10001,v取值[7, 33],m取值[1000, 2000],性能结果如下:

knapsackUnbounded3=3168, time_cost=769.803021ms
knapsackBounded2=3168, time_cost=1.718676383s
knapsackBounded1废了,算不出结果

到此为止,背包问题的讨论蛮多了,我感觉自己又重新上了一遍大学的课程,温故而知新,慢慢来,学习是长跑,不追求一蹴而就,追求的是细水长流。

导语

之前写过一篇《一文搞懂经典0-1背包问题》,n个物品放入包中,每件只能选择放一次,放过之后就不能再放了。当时分析了三种算法,一个是递归搜索(在n<20,W特别大时适用),一种是记忆化递归,最后是动态规划算法,记忆化递归和动态规划都依赖于子问题有重复,否则,最糟糕的情况下,每件物品都是不同的2的幂次方重量,就会退化到递归搜索。今天要再深入讨论两个变形题目,一个是无限制的背包问题,一个是有限制的背包问题,今天先讨论下无限制的背包问题。

无限制背包(Unbounded Knapsack Problem)

简单说每件物品没有数量限制,随便放入包中。关于这个算法,有三种解法。基本思路都是虚拟出所有的物品,因为有总重量限制,每件物品放入背包中的数量总是有上限的。
先看下0-1背包问题动态规划实现方法的源码:

func knapsack01D(w []int, v []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        for j := W; j >= w[i]; j-- {
            dp[j] = max(dp[j-w[i]]+v[i], dp[j])
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

这里把更新dp数组的部分,包装成一个函数knapsack01,如下:

func knapsack01(dp []int, w int, v int) {
    W := len(dp)
    for j := W-1; j >= w; j-- {
        dp[j] = max(dp[j-w]+v, dp[j])
    }
}

第一种方法,是把每件东西可以选择数量都虚拟出来(naive),比如第i件重量w[i],那么第i件东西最多有W/w[i]件,然后就变成每件虚拟的物品扔给knapsack01更新dp数组的部分。

func knapsackUnbounded1(w []int, v []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        m := W/w[i]
        for j := 0; j < m; j++ {
            knapsack01(dp, w[i], v[i])
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

算法时间复杂度为O(NW^2),空间复杂度O(W)(已经通过滚动数组优化)。

第二种方法,控制虚拟出的物品数量更少(smart),以不同的2的幂次方虚拟出物品,对于第i件物品,最多能放W/w[i]件,第i件物品的虚拟物品为,w[i]2^0、w[i]2^1、w[i]2^2……w[i]2^logW/w[i],代码如下:

func knapsackUnbounded2(w []int, v []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        m := math.Ilogb(float64(W)/float64(w[i]))
        for k := 0; k <= m; k++ {
            knapsack01(dp, w[i] << k, v[i] << k)
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

算法时间复杂度为O(NWlogW),空间复杂度O(W)(已经通过滚动数组优化)。

第三种方法,实现上是最简单的,但也是最难想到的,原先我们做dp更新时,故意将j的迭代顺序从W最大值逆向迭代到当前物品重量w,其实我们只需要改为顺向迭代即可,更新dp函数knapsack01改为knapsackComplete,代码如下:

func knapsackComplete(dp []int, w int, v int) {
    W := len(dp)
    for j := w; j < W; j++ {
        dp[j] = max(dp[j-w]+v, dp[j])
    }
}

整体的背包问题代码变为:

func knapsackUnbounded3(w []int, v []int, W int) int {
    n := len(w)
    dp := make([]int, W+1)
    for i := 0; i < n; i++ {
        knapsackComplete(dp, w[i], v[i])
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[i] > r {
            r = dp[i]
        }
    }

    return r
}

算法时间复杂度为O(NW),空间复杂度O(W)(已经通过滚动数组优化)。

可以这么优化的原因,我通过下面这个简单例子说明:
无限制背包问题.png

红色的箭头就是表示,我同一件物品可以反复取,直到即将超过W最大重量为止,比如在i=2时,计算dp[5],从dp[3]变化过来的,dp[3]此时已经使用过w[2]了,但没关系。因为此时第2件物品可以反复使用。(自己去想想,理解这个点,是无限制背包问题里面最难的)。

性能测试

上面进行了理论时间复杂度的分析,我跑了下性能,设置W=10000,w物品数量1001,w取值[11,22],v取值[7,33],测试下来的性能符合理论分析,knapsackUnbounded3 优于 knapsackUnbounded2 优于 knapsackUnbounded1,并且knapsackUnbounded1基本上无法work了,在W=10000、w取值[11,22]时,耗时大概已经达到11s。

knapsackUnbounded3=29088, time_cost=9.429946ms
knapsackUnbounded2=29088, time_cost=151.371282ms
knapsackUnbounded1=29088, time_cost=11.718217767s

下面有时间再分析下有限制的背包问题,这个问题是背包变形问题中较为复杂的。

背景

记得在大学课程中,修过《算法和数据结构》,其中的0-1背包问题,是非常经典的算法题目,当时上课时,听老师讲完后,懂了个大概,工作若干年后,已经都忘记得差不多了,印象深刻的就是往背包里面塞东西,有个重量限制,问能取得的最大价值是多少。当时最naive的想法是,贪心不就完了,用价值/重量比最高的往里面塞就可以了,但其实这个贪心解法在0-1背包问题上是不work的。对于0-1背包问题,贪心选择之所以不能得到最优解是因为:它无法保证最终能将背包装满,部分闲置的背包空间使每公斤背包空间的价值降低了。事实上,在考虑0-1背包问题时,应比较选择该物品和不选择该物品所导致的最终方案,然后再作出最好选择。由此就导出许多互相重叠的子问题。这正是该问题可用动态规划算法求解的另一重要特征。(原因引用出处)最近,我想看看0-1背包问题,到底有什么算法求解,是最合适的,也是最合理的,所以开始翻阅了资料,自己动手实现了代码,也就有了此文。

问题

说了这么多,在这篇文章中,您将看到以下内容:

  1. 0-1背包问题的三种实现,时间复杂度和空间复杂度分析;
  2. 针对上述的代码实现,构建实际测试用例,看实际case下的benchmark数据;

讨论

0-1背包问题(NP-Complete)

问题定义,给定N个物品,w[i]表示第i件物品的重量,v[i]表示第i件物品的价值。给你一个背包,总重量限制为W。求这个背包可以装下物品的最大价值和,注意,每个物品最多用一次。
解法1. 组合搜索
使用一个长度为n的二进制string,表示x0x1……xn。其中xi要么是0要么是1,0表示该物品不放入背包,1表示放入背包。尝试所有的可能组合。时间复杂度O(2^n),空间复杂度O(n)。该实现时间复杂度较高,一般只能在n很小的时候使用,一般为n <= 20,而且v,W >> 10^6。
代码实现如下:

func knapsack01C(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01C")
    r := 0
    var help func(s int, c_w int, c_v int)
    help = func(s int, c_w int, c_v int) {
        if c_v > r {
            r = c_v
        }

        for i := s; i < len(v); i++ {
            if c_w+w[i] <= W {
                help(i+1, c_w+w[i], c_v+v[i])
            }
        }
    }

    help(0, 0, 0)

    return r
}

解法2. 记忆化递归
定义dp(i,j)为使用前i个物品,在重量为j获取到的最大价值,dp(*,0) = 0。关键是把(i,j)看作一个状态(state),这个state下最大的价值作为值记忆化存起来,再此求解到相同状态,直接返回之前求解的值即可。
举个实例:
v = {1, 2, 4, 5}
w = {1, 1, 2, 2}

企业微信20220128-112750@2x.png
解法2的时间复杂度,O(NW),空间复杂度,O(NW),如果重量w = {1, 2, 4, 8……, 2^n},那么会退化为解法1,不会有上图中重复子问题的出现,可见记忆化递归和下面要讲的动态规划,都是基于问题是否存在大量的重复子问题,如果没有,意义不大。
代码实现如下:

func knapsack01R(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01R")
    n := len(v)
    dp := make(map[[2]int]int)
    var help func(i int, l_w int) int
    help = func(i int, l_w int) int {
        if i == 0 {
            return 0
        }

        k := [2]int{i, l_w}
        t, ok := dp[k]
        if ok {
            return t
        }

        l, r := 0, 0
        if l_w >= w[i-1] {
            l = help(i-1, l_w-w[i-1]) + v[i-1]
        }
        r = help(i-1, l_w)

        ret := 0
        if l > r {
            ret = l
        } else {
            ret = r
        }

        dp[k] = ret
        return ret
    }

    r := 0
    for i := 0; i <= n; i++ {
        t := help(i, W)
        if t > r {
            r = t
        }
    }
    return r
}

解法3. 动态规划
这个方式与方法2记忆化递归方法类似。
定义dpi为使用前i个物品,在重量为j时获取到的最大价值。状态转移方程:
dpi = dpi-1 j < w[i] or max{dpi-1]+v[i], w[i] <= j <= W}
ans = max{dpN}
时间复杂度为O(NW),空间复杂度O(NW)。空间复杂度通过滚动数组,可以降维到O(W)。什么时候使用此解法?一般n > 20并且N、W在10^6~10^7数量级。
代码实现如下:

func knapsack01D(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01D")
    n := len(v)
    var dp [][]int
    for i := 0; i <= n; i++ {
        row := make([]int, W+1)
        dp = append(dp, row)
    }

    for i := 0; i <= n; i++ {
        dp[i][0] = 0
    }

    for j := 0; j <= W; j++ {
        dp[0][j] = 0
    }

    for i := 1; i <= n; i++ {
        for j := 0; j <= W; j++ {
            dp[i][j] = dp[i-1][j]
            if j >= w[i-1] && dp[i-1][j-w[i-1]]+v[i-1] > dp[i][j] {
                dp[i][j] = dp[i-1][j-w[i-1]] + v[i-1]
            }
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[n][i] > r {
            r = dp[n][i]
        }
    }

    return r
}

空间复杂度可以从O(NW)降低到O(W),使用的是动态规划最常规的方案,滚动数组,注意复值的顺序,应该从数组尾端到头部赋值,代码如下:

func knapsack01D2(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01D")
    n := len(v)
    dp := make([]int, W+1)

    for j := 0; j <= W; j++ {
        dp[j] = 0
    }

    for i := 1; i <= n; i++ {
        for j := W; j >= w[i-1]; j-- {
            if dp[j-w[i-1]]+v[i-1] > dp[j] {
                dp[j] = dp[j-w[i-1]] + v[i-1]
            }
        }
    }

    r := 0
    for j := 0; j <= W; j++ {
        if dp[j] > r {
            r = dp[j]
        }
    }

    return r
}

性能测试的benchmark
虽然上述3种解法都是可以work的,但是在性能上差别颇大,并且解法1在物品数量N<10时(W为10^5~10^6),耗时较动态规划和记忆化递归更小,但是随着N的变大,特别是N>20开始,极速衰减性能(果然和理论值匹配),具体benchmark性能图表如下:
企业微信20220128-195926@2x.png

企业微信20220128-195949@2x.png

企业微信20220128-195956@2x.png

结束

到这里,对经典0-1背包问题的讨论暂告一个段落,可以看到0-1背包作为一个NPC问题,算法时间复杂度还是比较高的,所以一般的搜索组合算法,现在在物品数量>20时,在实际工作中是不太可行的,这个递归搜索的代码实现,唯一的优势是,代码简单易懂。但到了这里,还是不够,我后面也想在深入思考下,0-1背包问题的2个变种问题,即unbounded背包和bounded背包,这两个问题会更复杂些,但基于0-1背包问题的讨论基础上,或许会更容易理解。

背景

蓄水池采样算法,用以解决在不确定数据量情况下的采样,比如数据流随机采样K个值,并且每一个值的采样概率需要相同,乍一听好神奇,怎么证明这个相同的概率是理论正确的呢?

证明过程

先附上一个自己的代码实现:

func reservoirSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, total)
    r2 := 0

    for i := 0; i < total; i++ {
        r1[i] = i
    }

    for i := sample; i < total; i++  {
        tmp := rand.Intn(i)
        if tmp < sample {
            r1[i], r1[tmp] = tmp, i
        }
    }

    return r1[:sample], r2
}

这里并没有体现是一个数据流,但没关系,这个不影响采样概率的证明。证明的过程,如果是理解了问题的本质,其实比较简单,分两种情况讨论,假设从n个数据中随机采样k个数据:

  1. 来看第1~k个数据,如果要保留到最后的采样集合中,一开始1~k保留的概率为1,直到k+1个数据出现,对于1~k个数据中任何一个数据来说,被选中换出的概率是1/(k+1),反面就是被保留的概率,就是1-1/(k+1)=k/k+1,紧接着第k+2个数据来了,那么类似的分析法,被保留的概率是(k+1)/(k+2),以此类推,知道第n个数据出现,概率就是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
  2. 来看第i个数据(i>k),第i个数据被选中的概率,显然是k/i,紧接着第i+1个数据来了,此时前面第i个数据被剔除的概率为1/(i+1),被保留的概率就是1-1/(i+1)=i/(i+1),第i+2个数据来了,类似的分析法,第i个数据被保留的概率是(i+1)/(i+2),那么到第n个数据的时候,被保留的概率是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
    完美证明,如论是最开始的1~k个数据,还是k+1~n个数据,选中被保留到最后的概率都是k/n,是不是很神奇?简单的数学,总是令人开心,不需要关心一堆复杂公式的推导,哈哈。

和naive随机采样的对比

其实蓄水池采样算法不仅仅可以用于数据流中随机取k个数据,也可以用在确定的n个数据中取k个不重复数据,写了一个naive的随机取k个数据,还要去重(因为随机取数据过程可能会重复),那么如果采样的数据量和整体比较大,比如3000个数要取1000个出来,蓄水池算法的性能比较好,而且代码实现上更加优雅,因为navie的随机取,需要考虑去重,不仅性能差,代码可读性也较差,如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            // 不去重
            r2++ 
            // 去重
            // continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func randomSequence(sample int, count int, total int, f func(int,int)([]int, int)) {
    start := time.Now()
    rand.Seed(time.Now().UnixNano())

    m := make(map[int]int)
    r := 0
    for i := 0; i < count; i++ {
        r1, r2 := f(sample, total)
        r += r2
        for _, v := range r1 {
            m[v] += 1
        }
    }
    elapsed := time.Since(start)
    fmt.Printf("elapsed=%v\nr|t=%d|%d, rr=%f, sample=%d\nm=%v\n",
        elapsed, r, sample*count, float32(r)/float32(sample*count), len(m), m)
}

func main() {
    fmt.Printf("naiveSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, reservoirSampling)
    
    fmt.Printf("naiveSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, reservoirSampling)
}

可以看到naiveSampling先不做去重,只统计重复出现采样的数字出现次数,方便计算rr(redundant rate,数据重复率,也就是重复出现的数据),那么通过两组参数可以看到,在3000个数据里面sample 500个数据,重复率达到7.9%,而3000个数字里面sample 1000个数据,重复率骤增到15%(基本成线性增长),具体结果如下:

naiveSampling, sample=500, total=3000
elapsed=759.6145ms
r|t=394722|5000000, rr=0.078944, sample=3000
m=map[0:1730 1:1697 2:1624 3:1658 4:1664 5:1673 6:1650 7:1672 8:1679 9:1678 10:1668 11:1652 12:1630 13:1661 14:1682 ……]
reservoirSampling, sample=500, total=3000
elapsed=835.8041ms
r|t=0|5000000, rr=0.000000, sample=3000
m=map[0:1585 1:1569 2:1618 3:1682 4:1631 5:1549 6:1646 7:1721 8:1709 9:1610 10:1699 11:1657 12:1645 13:1692 14:1684 ……]
naiveSampling, sample=1000, total=3000
elapsed=1.4001243s
r|t=1494500|10000000, rr=0.149450, sample=3000
m=map[0:3286 1:3222 2:3372 3:3338 4:3302 5:3365 6:3303 7:3403 8:3419 9:3299 10:3315 11:3254 12:3454 13:3362 14:3317 ……]
reservoirSampling, sample=1000, total=3000
elapsed=894.9584ms
r|t=0|10000000, rr=0.000000, sample=3000
m=map[0:3249 1:3393 2:3280 3:3322 4:3276 5:3417 6:3217 7:3345 8:3305 9:3444 10:3299 11:3365 12:3411 13:3276 14:3339 ……]

当然,此时蓄水池算法重复率肯定为0,但是性能会比不去重的naive算法差(去掉map计算去重部分,相对来说,naive性能更好),但是一旦开启了去重,当采样的数据个数达到一定比例,比如总数3000,采样1000个数据时,蓄水池采样算法也不会比naive的更差,有兴趣的可以自己修改下代码,运行得到时间统计。

总结

蓄水池采样算法妙在可以保证每个采样数据的被最终选中的等概率性,而且数学证明上很简洁,通常在数据量位置情况下使用,扩展想,也可以使用在数据量确定情况下,选择k个不重复数据,再扩展些看,如果总数特别大,蓄水池采样算法也可以实现分布式版本,解决单机性能的瓶颈(算力或者存储资源),在此不在赘述,也很简单。过程大致可以为:

  1. 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, ..., NK(假设m<Nk)。N1+N2+...+NK=N。
  2. 取[1, N]一个随机数d,若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;以此类推,重复m次,则最终从N大数据集中选出m个数据。

参考资料

蓄水池抽样算法(Reservoir Sampling)
蓄水池采样算法