分类 算法 下的文章

如何给磁盘文件排序?

近来,翻出来之前买的《编程珠玑》,买来还没有仔细阅读,随手翻开看看,经典书籍果然是经典书籍,读起来,还是那么深刻。我准备将自己对书上内容的理解记录下来,一是通过整理巩固知识,二是写成文字,可作为输出内容。

上来讲的是“如何给磁盘文件排序”的问题,问题的准确描述如下:
输入:一个最多包含n个正整数的文件,每个数都小于n,其中n=10^7。如果在输入文件中有任何整数重复出现就是致命错误。没有其他数据与该整数相关联。
输出:按升序排列输入整数的列表。
约束:最多有(大约)1MB的内存空间可以使用,有充足的磁盘存储空间可用。运行时间最多为几分钟(感觉到之前时代算力的紧张了),运行时间为10秒就不需要优化了。(现在可以改为运行时间低于100ms就不需要优化了)。

思考过程:
方法一、归并排序读入输入文件一次,然后在中间文件的协作下,完成多路归并排序,并写入输出文件一次。中间文件需要多次读写。

企业微信20220126-114447@2x.png

方法二、40趟算法读入输入文件,前提是要知道整数分布的范围,那么就可以按号段去扫描读入,比如问题中描述的1MB空间,如果用来存0~100万数字表述的号码,那么一个号码可以使用一个32位整型数表示,1MB空间可以存大约250000个号码,100万的号码可以先扫描到内存中0~249999之间的整数,内存做快速排序,结果存到输出文件,以此类推。

企业微信20220126-114503@2x.png

下图所示的方案更可取。我们结合上述两种方法的优点,读输入文件仅一次,且不使用任何中间文件。但前提是1MB内存需要来表示所有的整数。该怎么做呢?

企业微信20220126-114515@2x.png

解决方案:
现在想来,直接的方案就是使用bitmap,举例,可以使用一个20位长的字符串表示一个所有元素都小于20的简单的非负整数集合。例如,可以使用如下字符串(二进制的表示)来表示集合{1,2,3,5,8,13}:
0 1 1 1 0 1 0 0 1 0 0 0 0 1 0 0 0 0 0 0
那么1MB内存有3000万bit,每个bit表示1个整形数,可以表示的整数个数可以达到3000+万。这种表示利用了该问题的三个在排序问题中不常见的属性:输入数据限制在相对较小的范围内;数据没有重复;而且对每条记录而言,除了单一整数外,没有任何其他关联数据。

实现代码:

#include <time.h>
#include <vector>
#include <algorithm>
#include <string>
#include <fstream>

using namespace std;

#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000

int a[1+N/BITSPERWORD];

void set(int i) { a[i>>SHIFT] |= (1<<(i & MASK)); }
void clr(int i) { a[i>>SHIFT] &= ~(1<<(i & MASK)); }
int test(int i) { return a[i>>SHIFT] & (1<<(i & MASK)); }

unsigned long GetTickCount()
{
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
}

void stl_sort() {
    vector<int> v;
    ifstream ifile;
    ifile.open("./input.txt");
    string line;
    unsigned long begin_time = GetTickCount();
    while(getline(ifile, line)) {
        v.push_back(atoi(line.c_str()));
    }
    unsigned long end_time = GetTickCount();
    printf("finish input, cost time=%d\n", end_time-begin_time);
    ifile.close();

    begin_time = GetTickCount();
    sort(v.begin(), v.end());
    end_time = GetTickCount();
    printf("finish sorting, cost time=%d\n", end_time-begin_time);

    ofstream ofile;
    ofile.open("./output1.txt");
    begin_time = GetTickCount();
    for (int i = 0; i < v.size(); ++i) {
        ofile << v[i] << endl;
    }
    end_time = GetTickCount();
    printf("finish writing, cost time=%d\n", end_time-begin_time);
    ofile.close();
}

void bitmap_sort() {
    vector<int> v;
    ifstream ifile;
    ifile.open("./input.txt");
    string line;
    unsigned long begin_time = GetTickCount();
    while(getline(ifile, line)) {
        v.push_back(atoi(line.c_str()));
    }
    unsigned long end_time = GetTickCount();
    printf("finish input, cost time=%d\n", end_time-begin_time);
    ifile.close();

    begin_time = GetTickCount();
    for (int i = 0; i <= N; ++i) {
        clr(i);
    }
    for (int i = 0; i < v.size(); ++i) {
        //printf("set: i=%d, v[i]=%d\n", i, v[i]);
        set(v[i]);
    }
    end_time = GetTickCount();
    printf("finish sorting, cost time=%d\n", end_time-begin_time);

    ofstream ofile;
    ofile.open("./output2.txt");
    begin_time = GetTickCount();
    for (int i = 0; i <= N; ++i) {
        if (test(i)) {
            ofile << i << endl;
        }
    }
    end_time = GetTickCount();
    printf("finish writing, cost time=%d\n", end_time-begin_time);
    ofile.close();
}

上面代码实现了从文件读入数据,排序(STL实现的排序和bitmap方式),实测在1000万量级上的耗时如下:

finish input, cost time=795
finish sorting, cost time=3873 // 3873ms
finish writing, cost time=13634
finish input, cost time=793
finish sorting, cost time=151 // 151ms
finish writing, cost time=8777

排序的加速效果是非常明显的,同样数据规模,比STL实现的排序算法加速了20x。

另外,这个bitmap表示一个整数集合的思想,也不仅仅用在排序场景中,有些去重的场景也适用(或者检查有无重复项),比如今天遇到的leetcode上一道题目,题目不难,但是如果使用bitmap实现,代码挺整洁的。
检查是否每一行每一列都包含全部整数

导语

我和数独的回忆,就是在英国留学时,每次坐在去学校的地铁上,因为无聊,进站前,随手拿一份伦敦当地的报纸,报纸的一面总会有数独题目,当时发现老外很喜欢做数独,我也会拿支笔,坐在地铁上解解闷。

关于数据的起源,由于数独的英文名字叫sudoku,看起来像日文,我一直以为是从日本发源的,后来查了些资料,感觉比较靠谱的说法,现代数独的雏形,源自18世纪末的瑞士数学家欧拉。其发展经历从早期的“拉丁方块”、“数字拼图”到现在的“数独”,从瑞士、美国、日本再回到欧洲,虽几经周折,却也确立了它在世界谜题领域里的地位,并明确了九个数字的唯一性。

数独前身为“九宫格”,最早起源于中国。数千年前,我们的祖先就发明了洛书,其特点较之现在的数独更为复杂,要求纵向、横向、斜向上的三个数字之和等于15,而非简单的九个数字不能重复。儒家典籍《易经》中的“九宫图”也源于此,故称“洛书九宫图”。而“九宫”之名也因《易经》在中华文化发展史上的重要地位而保存、沿用至今。

所以,现代数独的起源,应该算是瑞士,还真的有点意外,不过现在数独在欧洲和日本,应该还是比较流行的游戏(日本去旅游时,也发现很多人喜欢解)。

那么今天想讨论的是,对于数独,究竟有怎么样的出题方法?

填数法

从无到有的出题方法。在一个空盘面上填上部分数字形成一道题目。这个其实就是从空的,或者很少部分填写的棋盘开始,生成一个解。代码如下:

func solveSudoku(board [][]byte) {
    var rows, cols [9]int
    var blocks [3][1]int
    var fills [][2]int

    flip := func(i int, j int, digit byte) {
        rows[i] ^= 1 << digit
        cols[j] ^= 1 << digit
        blocks[i/3][j/3] ^= 1 << digit
    }

    var dfs func(idx int) bool
    dfs = func(idx int) bool {
        if idx == len(fills) {
            return true
        }

        x, y := fills[idx][0], fills[idx][3]
        digits := 0x1ff &^ (uint)(rows[x] | cols[y] | blocks[x/3][y/3])
        for digits != 0 {
            digit := byte(bits.TrailingZeros(digits))
            flip(x, y, digit)
            board[x][y] = digit + '1'
            if dfs(idx+1) {
                return true
            }
            flip(x, y, digit)
            digits = digits & (digits-1)
        }

        return false
    }

    for i, row := range board {
        for j, b := range row {
            if b == '.' {
                fills = append(fills, [2]int{i, j})
            } else {
                digit := b - '1'
                flip(i, j, digit)
            }
        }
    }

    dfs(0)
}

有了上面这个数独题解函数之后,我们可以给一个空白的棋盘,然后把答案随机移除掉几个格子,就可以有一到数独题目,代码如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            //r2++
            continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func main() {
    board := make([][]byte, 9)
    boardPrt := func() {
        for _, row := range board {
            fmt.Printf("%v\n", string(row))
        }
    }

    board[0] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[1] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[2] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[3] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[4] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[5] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[6] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[7] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[8] = []byte{'.','.','.','.','.','.','.','.','.'}
    solveSudoku(board)
    fmt.Printf("generate sukudo\n")
    boardPrt()

    r1, _ := reservoirSampling(16, 81)
    for _, v := range r1 {
        x, y := v/9, v%9
        board[x][y] = '.'
    }
    fmt.Printf("remove something\n")
    boardPrt()

    solveSudoku(board)
    fmt.Printf("solve sukudo\n")
    boardPrt()
}

naiveSampling只是随机采样那几个格子挖掉答案,这样我们就得到了最终的结果:

generate sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642
remove something
1234.6..9
.5.789.23
78912.456
2143.5897
36.89..14
897.143.5
53.642978
6429.8531
97853164.
solve sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642

是不是挺有趣的?那么除了这个填数法,还有没有生成数独的方法呢?有!请继续往下阅读。

挖洞法

我们先准备一个排列好的3*3矩阵,数字由字母代替,如下:
数独1.png
把整个数独矩阵分为9个3*3小矩阵,如下:
数独2.png
可以把上面准备好的矩阵放到最中央的B5,如下:
数独3.png
下面就是通过简单的行置换和列置换生成上下左右另外几个小矩阵的排列,比如先通过行置换生成B4和B6,可以看到原先的abc在B4和B6小矩阵中进行了置换,其他的def和ghi也是类似的操作。B2和B8则是通过列置换,和行置换类似的方式。得到的结果如下:
数独4.png
最后,四个角上的小矩阵,通过行或者列置换,就可以生成出来,最终的矩阵如下:
数独5.png
这样就很快速的拥有了一个数独题目,只要简单的将1~9和a~i字母随机映射,就可以得到不同的题目了,程序是非常非常简单的,肯定比上面的程序简单多了(上面的程序需要验证数独是否合法,这里不需要,整个过程中的操作保证是合法的),这样的方式可以得到9!个不同的数独题目,需要注意的是,这远远不是数独题目的总数,但是足够爱好者玩一阵的。

总结

简单的游戏,锻炼着我们的智力,数独流行了300多年时间,也不是没有理由的。即使在现今社会,手机App占据了生活中越来越多的碎片时间,作为人的我们,还是需要像数独一样的“智力游戏”!做数独时,也许不仅仅是突然灵感乍现的快感,进行推理思考时的深沉,还有一份宁静的时光,不容亵渎。

导语

记得高中时,老师曾经出过一道题目,1000的999次方和999的1000次方谁大,凭直觉,很多人都说当然是999的1000次方更大,至于为什么,当时很少有同学知道,很多人都这样想的,底数相差1,次方大1的数更大。当年,老师带着大家做了深入的思考和分析,总结出来了规律和原因,结果证明直觉并不可靠。直至最近,我看到一个三进制计算机性能更好的视频,突然把这两个知识串联起来了,再此做一个分享

抽象&分析

当年老师首先要求我们进行题目抽象,1000^999,其实就是有999个1000相乘,这些数的和等于1000*999=999000,而999^1000,就是1000个999相乘,这些数的和也正好是999000,于是,题目就变成了把一个正整数拆成n个正整数的和,是它们的积最大?

那么如何解决抽象出来的问题呢,999000数字太大,不好拆解,那么就使用个小点的数字,比如12,12能怎么拆呢,无非下面几种情况:
12=1+1+1……+1,一共12个1相加,积为1
12=2+2+2……+2,一共6个2相加,积为64
12=3+3+3+3,一共4个3相加,积为81
12=4+4+4,一共3个4相加,积为64
12=5+5+2,积为552,积为50
……
还有其他拆解方法,但是显然,最大的就是拆解为4个3,这时我们就发现了,之前直觉并不对,为什么?因为2^6和3^4相比,次方数2^6比3^4大2,底数小1,但是最后的结果是3^4更大。不管你后面怎么拆,3^4的值最大。仔细看看拆分的方案,规律可以总结为:

  1. 拆一个正整数为n个正整数相乘,尽可能地多拆3出来,不能拆3,就拆2或者4,坚决不能拆1;
  2. 如果拆出来的数比3大的越多,乘积就越小,比如上面的4^3和5%2等比3大的拆分方案;
    知道了这个规律后,就非常容易判断1000^999和999^1000哪个数更大,显然,999比1000更接近3,999^1000更大。那么上面的规律如何证明呢,我记得老师噼里啪啦写了推导,证明过程如下:

N = x + x + …… + x,把大N拆成n个x的和,n=N/x,要使得f(x) = x^n=x^(N/x)最大。
如果要求f(x)最大,lnf(x)也满足最大,两边取对数lnf(x) = lnx^(N/x),即lnf(x) = (N/x)*lnx。
求导数[lnf(x)]' = N * (1-lnx) / x^2,显然当x > e,导数小于0,是减函数,x < e,导数大于0,是增函数,所以x = e时,lnf(x)最大,e = 2.71818,显然最接近e的正整数就是3,所以规律得证。

联想

最近又看到一个视频,讲得是三进制计算机性能最好,我突然联想到这个曾经学过的知识,后来仔细想了下,视频想表达的观点,应该是三进制比二进制或者十进制表达能力更强,相同的内存大小,如果是三进制,可以表达的数最多。小时候都玩过一个游戏,一组珠串,假设给你100颗珠子,怎样分配,可以使得表示的数字数量最多,结果就是下图:
三进制.png
每个柱子上留三颗珠子,可表示3^324个数(计算机里面就是3^324个状态),表达的数量最多。没想到,三进制表达能力最强,在我高中时,老师就把原因告诉了我,哈哈。所以学好数学,培养好逻辑思考能力,真的是终身受用,加油吧,少年。

背景

记得在大学课程中,修过《算法和数据结构》,其中的0-1背包问题,是非常经典的算法题目,当时上课时,听老师讲完后,懂了个大概,工作若干年后,已经都忘记得差不多了,印象深刻的就是往背包里面塞东西,有个重量限制,问能取得的最大价值是多少。当时最naive的想法是,贪心不就完了,用价值/重量比最高的往里面塞就可以了,但其实这个贪心解法在0-1背包问题上是不work的。对于0-1背包问题,贪心选择之所以不能得到最优解是因为:它无法保证最终能将背包装满,部分闲置的背包空间使每公斤背包空间的价值降低了。事实上,在考虑0-1背包问题时,应比较选择该物品和不选择该物品所导致的最终方案,然后再作出最好选择。由此就导出许多互相重叠的子问题。这正是该问题可用动态规划算法求解的另一重要特征。(原因引用出处)最近,我想看看0-1背包问题,到底有什么算法求解,是最合适的,也是最合理的,所以开始翻阅了资料,自己动手实现了代码,也就有了此文。

问题

说了这么多,在这篇文章中,您将看到以下内容:

  1. 0-1背包问题的三种实现,时间复杂度和空间复杂度分析;
  2. 针对上述的代码实现,构建实际测试用例,看实际case下的benchmark数据;

讨论

0-1背包问题(NP-Complete)

问题定义,给定N个物品,w[i]表示第i件物品的重量,v[i]表示第i件物品的价值。给你一个背包,总重量限制为W。求这个背包可以装下物品的最大价值和,注意,每个物品最多用一次。
解法1. 组合搜索
使用一个长度为n的二进制string,表示x0x1……xn。其中xi要么是0要么是1,0表示该物品不放入背包,1表示放入背包。尝试所有的可能组合。时间复杂度O(2^n),空间复杂度O(n)。该实现时间复杂度较高,一般只能在n很小的时候使用,一般为n <= 20,而且v,W >> 10^6。
代码实现如下:

func knapsack01C(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01C")
    r := 0
    var help func(s int, c_w int, c_v int)
    help = func(s int, c_w int, c_v int) {
        if c_v > r {
            r = c_v
        }

        for i := s; i < len(v); i++ {
            if c_w+w[i] <= W {
                help(i+1, c_w+w[i], c_v+v[i])
            }
        }
    }

    help(0, 0, 0)

    return r
}

解法2. 记忆化递归
定义dp(i,j)为使用前i个物品,在重量为j获取到的最大价值,dp(*,0) = 0。关键是把(i,j)看作一个状态(state),这个state下最大的价值作为值记忆化存起来,再此求解到相同状态,直接返回之前求解的值即可。
举个实例:
v = {1, 2, 4, 5}
w = {1, 1, 2, 2}

企业微信20220128-112750@2x.png
解法2的时间复杂度,O(NW),空间复杂度,O(NW),如果重量w = {1, 2, 4, 8……, 2^n},那么会退化为解法1,不会有上图中重复子问题的出现,可见记忆化递归和下面要讲的动态规划,都是基于问题是否存在大量的重复子问题,如果没有,意义不大。
代码实现如下:

func knapsack01R(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01R")
    n := len(v)
    dp := make(map[[2]int]int)
    var help func(i int, l_w int) int
    help = func(i int, l_w int) int {
        if i == 0 {
            return 0
        }

        k := [2]int{i, l_w}
        t, ok := dp[k]
        if ok {
            return t
        }

        l, r := 0, 0
        if l_w >= w[i-1] {
            l = help(i-1, l_w-w[i-1]) + v[i-1]
        }
        r = help(i-1, l_w)

        ret := 0
        if l > r {
            ret = l
        } else {
            ret = r
        }

        dp[k] = ret
        return ret
    }

    r := 0
    for i := 0; i <= n; i++ {
        t := help(i, W)
        if t > r {
            r = t
        }
    }
    return r
}

解法3. 动态规划
这个方式与方法2记忆化递归方法类似。
定义dpi为使用前i个物品,在重量为j时获取到的最大价值。状态转移方程:
dpi = dpi-1 j < w[i] or max{dpi-1]+v[i], w[i] <= j <= W}
ans = max{dpN}
时间复杂度为O(NW),空间复杂度O(NW)。空间复杂度通过滚动数组,可以降维到O(W)。什么时候使用此解法?一般n > 20并且N、W在10^6~10^7数量级。
代码实现如下:

func knapsack01D(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01D")
    n := len(v)
    var dp [][]int
    for i := 0; i <= n; i++ {
        row := make([]int, W+1)
        dp = append(dp, row)
    }

    for i := 0; i <= n; i++ {
        dp[i][0] = 0
    }

    for j := 0; j <= W; j++ {
        dp[0][j] = 0
    }

    for i := 1; i <= n; i++ {
        for j := 0; j <= W; j++ {
            dp[i][j] = dp[i-1][j]
            if j >= w[i-1] && dp[i-1][j-w[i-1]]+v[i-1] > dp[i][j] {
                dp[i][j] = dp[i-1][j-w[i-1]] + v[i-1]
            }
        }
    }

    r := 0
    for i := 0; i <= W; i++ {
        if dp[n][i] > r {
            r = dp[n][i]
        }
    }

    return r
}

空间复杂度可以从O(NW)降低到O(W),使用的是动态规划最常规的方案,滚动数组,注意复值的顺序,应该从数组尾端到头部赋值,代码如下:

func knapsack01D2(v []int, w []int, W int) int {
    defer time_cost(time.Now(), "knapsack01D")
    n := len(v)
    dp := make([]int, W+1)

    for j := 0; j <= W; j++ {
        dp[j] = 0
    }

    for i := 1; i <= n; i++ {
        for j := W; j >= w[i-1]; j-- {
            if dp[j-w[i-1]]+v[i-1] > dp[j] {
                dp[j] = dp[j-w[i-1]] + v[i-1]
            }
        }
    }

    r := 0
    for j := 0; j <= W; j++ {
        if dp[j] > r {
            r = dp[j]
        }
    }

    return r
}

性能测试的benchmark
虽然上述3种解法都是可以work的,但是在性能上差别颇大,并且解法1在物品数量N<10时(W为10^5~10^6),耗时较动态规划和记忆化递归更小,但是随着N的变大,特别是N>20开始,极速衰减性能(果然和理论值匹配),具体benchmark性能图表如下:
企业微信20220128-195926@2x.png

企业微信20220128-195949@2x.png

企业微信20220128-195956@2x.png

结束

到这里,对经典0-1背包问题的讨论暂告一个段落,可以看到0-1背包作为一个NPC问题,算法时间复杂度还是比较高的,所以一般的搜索组合算法,现在在物品数量>20时,在实际工作中是不太可行的,这个递归搜索的代码实现,唯一的优势是,代码简单易懂。但到了这里,还是不够,我后面也想在深入思考下,0-1背包问题的2个变种问题,即unbounded背包和bounded背包,这两个问题会更复杂些,但基于0-1背包问题的讨论基础上,或许会更容易理解。

背景

蓄水池采样算法,用以解决在不确定数据量情况下的采样,比如数据流随机采样K个值,并且每一个值的采样概率需要相同,乍一听好神奇,怎么证明这个相同的概率是理论正确的呢?

证明过程

先附上一个自己的代码实现:

func reservoirSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, total)
    r2 := 0

    for i := 0; i < total; i++ {
        r1[i] = i
    }

    for i := sample; i < total; i++  {
        tmp := rand.Intn(i)
        if tmp < sample {
            r1[i], r1[tmp] = tmp, i
        }
    }

    return r1[:sample], r2
}

这里并没有体现是一个数据流,但没关系,这个不影响采样概率的证明。证明的过程,如果是理解了问题的本质,其实比较简单,分两种情况讨论,假设从n个数据中随机采样k个数据:

  1. 来看第1~k个数据,如果要保留到最后的采样集合中,一开始1~k保留的概率为1,直到k+1个数据出现,对于1~k个数据中任何一个数据来说,被选中换出的概率是1/(k+1),反面就是被保留的概率,就是1-1/(k+1)=k/k+1,紧接着第k+2个数据来了,那么类似的分析法,被保留的概率是(k+1)/(k+2),以此类推,知道第n个数据出现,概率就是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
  2. 来看第i个数据(i>k),第i个数据被选中的概率,显然是k/i,紧接着第i+1个数据来了,此时前面第i个数据被剔除的概率为1/(i+1),被保留的概率就是1-1/(i+1)=i/(i+1),第i+2个数据来了,类似的分析法,第i个数据被保留的概率是(i+1)/(i+2),那么到第n个数据的时候,被保留的概率是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
    完美证明,如论是最开始的1~k个数据,还是k+1~n个数据,选中被保留到最后的概率都是k/n,是不是很神奇?简单的数学,总是令人开心,不需要关心一堆复杂公式的推导,哈哈。

和naive随机采样的对比

其实蓄水池采样算法不仅仅可以用于数据流中随机取k个数据,也可以用在确定的n个数据中取k个不重复数据,写了一个naive的随机取k个数据,还要去重(因为随机取数据过程可能会重复),那么如果采样的数据量和整体比较大,比如3000个数要取1000个出来,蓄水池算法的性能比较好,而且代码实现上更加优雅,因为navie的随机取,需要考虑去重,不仅性能差,代码可读性也较差,如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            // 不去重
            r2++ 
            // 去重
            // continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func randomSequence(sample int, count int, total int, f func(int,int)([]int, int)) {
    start := time.Now()
    rand.Seed(time.Now().UnixNano())

    m := make(map[int]int)
    r := 0
    for i := 0; i < count; i++ {
        r1, r2 := f(sample, total)
        r += r2
        for _, v := range r1 {
            m[v] += 1
        }
    }
    elapsed := time.Since(start)
    fmt.Printf("elapsed=%v\nr|t=%d|%d, rr=%f, sample=%d\nm=%v\n",
        elapsed, r, sample*count, float32(r)/float32(sample*count), len(m), m)
}

func main() {
    fmt.Printf("naiveSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, reservoirSampling)
    
    fmt.Printf("naiveSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, reservoirSampling)
}

可以看到naiveSampling先不做去重,只统计重复出现采样的数字出现次数,方便计算rr(redundant rate,数据重复率,也就是重复出现的数据),那么通过两组参数可以看到,在3000个数据里面sample 500个数据,重复率达到7.9%,而3000个数字里面sample 1000个数据,重复率骤增到15%(基本成线性增长),具体结果如下:

naiveSampling, sample=500, total=3000
elapsed=759.6145ms
r|t=394722|5000000, rr=0.078944, sample=3000
m=map[0:1730 1:1697 2:1624 3:1658 4:1664 5:1673 6:1650 7:1672 8:1679 9:1678 10:1668 11:1652 12:1630 13:1661 14:1682 ……]
reservoirSampling, sample=500, total=3000
elapsed=835.8041ms
r|t=0|5000000, rr=0.000000, sample=3000
m=map[0:1585 1:1569 2:1618 3:1682 4:1631 5:1549 6:1646 7:1721 8:1709 9:1610 10:1699 11:1657 12:1645 13:1692 14:1684 ……]
naiveSampling, sample=1000, total=3000
elapsed=1.4001243s
r|t=1494500|10000000, rr=0.149450, sample=3000
m=map[0:3286 1:3222 2:3372 3:3338 4:3302 5:3365 6:3303 7:3403 8:3419 9:3299 10:3315 11:3254 12:3454 13:3362 14:3317 ……]
reservoirSampling, sample=1000, total=3000
elapsed=894.9584ms
r|t=0|10000000, rr=0.000000, sample=3000
m=map[0:3249 1:3393 2:3280 3:3322 4:3276 5:3417 6:3217 7:3345 8:3305 9:3444 10:3299 11:3365 12:3411 13:3276 14:3339 ……]

当然,此时蓄水池算法重复率肯定为0,但是性能会比不去重的naive算法差(去掉map计算去重部分,相对来说,naive性能更好),但是一旦开启了去重,当采样的数据个数达到一定比例,比如总数3000,采样1000个数据时,蓄水池采样算法也不会比naive的更差,有兴趣的可以自己修改下代码,运行得到时间统计。

总结

蓄水池采样算法妙在可以保证每个采样数据的被最终选中的等概率性,而且数学证明上很简洁,通常在数据量位置情况下使用,扩展想,也可以使用在数据量确定情况下,选择k个不重复数据,再扩展些看,如果总数特别大,蓄水池采样算法也可以实现分布式版本,解决单机性能的瓶颈(算力或者存储资源),在此不在赘述,也很简单。过程大致可以为:

  1. 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, ..., NK(假设m<Nk)。N1+N2+...+NK=N。
  2. 取[1, N]一个随机数d,若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;以此类推,重复m次,则最终从N大数据集中选出m个数据。

参考资料

蓄水池抽样算法(Reservoir Sampling)
蓄水池采样算法