标签 算法 下的文章

导语

作为80后,小时候一定玩过一款经典游戏,俄罗斯方块,英文名称Tetris。玩的画面在脑海中已是“上古”时期的事情,但也并不陌生,因为刷视频时,偶尔还能看到顶尖世界高手对决,他们玩的俄罗斯方块难度极大,游戏模式是对决,比谁的分数高,有些比赛到最后不仅自己消砖块得分,而且还可以通过得分不停给对手制造麻烦。
俄罗斯方块世界比赛.png

看到鹅厂组织的一次极客挑战赛,主题就是俄罗斯方块游戏,当然游戏名称叫“鹅”罗斯方块,蛮鹅厂风格的,可可爱爱。

鹅罗斯方块挑战赛.png
我比较感兴趣,当时也报名参加了,最后是在2000+入场选手排名在9位,比赛结束后,我又花了点时间调试了自己的算法参数,最终比赛分数可以刷进TOP3。谨以此文,记录自己的所思所想。

如何活着

第一步想到的是,如何“活着”,就和当前经济形势下,各行各业都得考虑先“活着”,能不能得到高分其次,先得“不死”。
俄罗斯方块游戏.png

我自己写了一个俄罗斯方块的游戏界面,和传统俄罗斯方块游戏一样,如何拼搭这些小图形,让高度不超过游戏规定的高度,满行就会消除,消除的行数有4种情况1行、2行、3行和4行。针对如何活着的俄罗斯方块AI算法比较多了,全网比较有名的是Pierre Dellacherie算法。我这里搜索到一篇El-Tetris – An Improvement on Pierre Dellacherie’s Algorithm,大家可以参考下。

首先,分析下俄罗斯方块中方块的种类,一共有7种图形:
俄罗斯方块形状.png

上面图中有7种不同的形状,我在代码里面的表示为坐标法,以(0, 0)为原点,标示周围那几个点对应图形需要占用。

BrickShapes = [
    # I
    [[[-2, 0], [-1, 0], [0, 0], [1, 0]],
     [[0, -1], [0, 0], [0, 1], [0, 2]]],

    # L
    [[[-2, 0], [-1, 0], [0, 0], [0, 1]],
     [[0, 0], [0, 1], [0, 2], [1, 0]],
     [[0, -1], [0, 0], [1, 0], [2, 0]],
     [[-1, 0], [0, -2], [0, -1], [0, 0]]],

    # J
    [[[-2, 0], [-1, 0], [0, -1], [0, 0]],
     [[-1, 0], [0, 0], [0, 1], [0, 2]],
     [[0, 0], [0, 1], [1, 0], [2, 0]],
     [[0, -2], [0, -1], [0, 0], [1, 0]]],

    # T
    [[[0, -1], [0, 0], [0, 1], [1, 0]],
     [[-1, 0], [0, -1], [0, 0], [1, 0]],
     [[-1, 0], [0, -1], [0, 0], [0, 1]],
     [[-1, 0], [0, 0], [0, 1], [1, 0]]],

    # O
    [[[-1, 0], [-1, 1], [0, 0], [0, 1]]],

    # S
    [[[-1, 0], [-1, 1], [0, -1], [0, 0]],
     [[-1, -1], [0, -1], [0, 0], [1, 0]]],

    # Z
    [[[-1, -1], [-1, 0], [0, 0], [0, 1]],
     [[-1, 0], [0, -1], [0, 0], [1, -1]]]
]

比如竖棍“I”的图形,有两种rotation,所以有两组坐标的标示方式,其他图形类似,只不过rotation数量不同:

    # I, two rotations
    [[[-2, 0], [-1, 0], [0, 0], [1, 0]],
     [[0, -1], [0, 0], [0, 1], [0, 2]]],

El-Tetris算法是传统AI人工智能,不要因为人工智能,就觉得很复杂,其实不论传统的AI算法还是目前深度学习的人工算法,一般都不会很复杂,深度学习AI算法不在此文讨论范围。基本上传统AI算法就是列举所有的可能性,通过计算预先建模的公式,评估最优秀的一个局面,做出选择,一直循环往复,直到游戏结束。我们看下El-Tetris算法的公式:
f(x) = b0x0+b1x1+b2x2...bnxn
其中x0,x1,x2...xn都是一些特征,b0,b1,b2...bn是一些系数(通过数学方法或者机器学习方法等手段推算出的经验值),对应特征和系数相乘相加之后,得到的就是当前局面+此选择的局面评估分数,是不是很简单,看上去是一个线性公式,但是效果出奇的好,原因还是在于每一个特征x的合理性以及系数b的合理性。

系数b不需要我们实现,就是一组常数字,作者说是通过particle swarm optimization寻找到的,具体怎么找,我也不太清楚,先拿来用吧。

另外要提一下,算法可以通常可以考虑当前图形+下一个图形,也可以只考虑当前图形,据说效果差不多,我这里只考虑当前图形的所有可能的操作,每一个图形出来,其实只需要考虑图形放置的position和rotation两个变量。所以整体的流程:

while the game is not over:
  examine piece given
  find the best move possible (an rotation and a position)
  play piece
repeat

上面铺垫这么多,那么一共建模了几种特征呢?一共是6种,如下:

  • Landing Height: The height where the piece is put (= the height of the column + (the height of the piece / 2)).
  • Rows eliminated: The number of rows eliminated.
  • Row Transitions: The total number of row transitions. A row transition occurs when an empty cell is adjacent to a filled cell on the same row and vice versa.
  • Column Transitions: The total number of column transitions. A column transition occurs when an empty cell is adjacent to a filled cell on the same column and vice versa.
  • Number of Holes: A hole is an empty cell that has at least one filled cell above it in the same column.
  • Well Sums: A well is a succession of empty cells such that their left cells and right cells are both filled.

Landing Height和Rows eliminated比较好理解,其他对着下图解释下:
特征计算.png

Row Transitions总数是16,最底部一行为2,依次往上2、2、2、2、4、2,注意我这里处理为左右边界外是被占据的,所以倒数第四行靠最左边有个空,算存在一次row transition,另外空行我没算(我猜测也可以计算row transition,不影响最后结果)。
Column Transitions总数是26,如果理解了Row Transitions,这个计算方法是一样的。
Number of Holes总数是4,看下上图,应该知道什么叫“洞”,被封闭了的空单元格。
Well Sums总数是5,如果左右两边单元格被占,顶上又没有被封闭,可以算作一个“井”格子,第一列一个单元格,左边界外算占据,右边也被占据,所以也算作“井”格子,深度为1,第三列就一个单元格,井深度为1,第六列有2个空单元格,井深度和为1+1+1+2。这个比较难理解,多看几遍。

对应的系数如下:
Landing Height = -4.500158825082766
Rows eliminated = 3.4181268101392694
Row Transitions = -3.2178882868487753
Column Transitions = -9.348695305445199
Number of Holes = -7.899265427351652
Well Sums = -3.3855972247263626

计算代码如下:

def find_best_play(current_piece, grid):
    if current_piece.y < 0:
        False, 0, 0

    rotations = len(current_piece.shape)
    x_moves = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]
    rotation = 0
    x_move = 0
    best_play_score = float("-inf")
    for i in range(rotations):
        for j in x_moves:
            candidate_piece = copy.deepcopy(current_piece)
            candidate_grid = copy.deepcopy(grid)
            candidate_piece.rotation += i
            candidate_piece.x += j

            if not (valid_space(candidate_piece, candidate_grid)):
                continue

            valid, score = evaluate(candidate_piece, candidate_grid)
            if valid and score > best_play_score:
                best_play_score = score
                rotation = i
                x_move = j

    return True, rotation, x_move


def evaluate(current_piece, grid):
    while True:
        current_piece.y += 1
        if not (valid_space(current_piece, grid)) and current_piece.y > 0:
            current_piece.y -= 1
            break

    shape_pos = convert_shape_format(current_piece)

    for i in range(len(shape_pos)):
        x, y = shape_pos[i]
        if y > -1:
            grid[y][x] = current_piece.color

    heights_points = (20 - current_piece.y + current_piece.height[current_piece.rotation % len(current_piece.height)]) * float(-4.500158825082766)
    rows_removed = get_rows_removed(grid) * float(6.4181268101392694)
    rows_transitions = get_row_transitions(grid) * float(-3.2178882868487753)
    cols_transitions = get_col_transitions(grid) * float(-9.348695305445199)
    num_of_holes = get_num_of_holes(grid) * float(-14.899265427351652)
    wells = get_well_sums(grid) * float(-6.3855972247263626)

    return True, heights_points + rows_removed + rows_transitions + cols_transitions + num_of_holes + wells

find_best_play列举所有可能的操作,evaluate就是评估局面的分数,其中几个计算特征值的函数:

def get_row_transitions(grid):
    rows = len(grid)
    cols = len(grid[0])
    transitions = 0
    last_pos = True
    for i in range(rows - 1, -1, -1):
        if is_empty_row(grid[i]):
            continue

        for j in range(cols):
            if (grid[i][j] in brick.SHAPES_COLORS) != last_pos:
                transitions += 1

            last_pos = (grid[i][j] in brick.SHAPES_COLORS)

        if not last_pos:
            transitions += 1

        last_pos = True

    return transitions

def get_col_transitions(grid):
    rows = len(grid)
    cols = len(grid[0])
    transitions = 0
    last_pos = True
    for j in range(cols):
        for i in range(rows - 1, -1, -1):
            if (grid[i][j] in brick.SHAPES_COLORS) != last_pos:
                transitions += 1

            last_pos = (grid[i][j] in brick.SHAPES_COLORS)

        if not last_pos:
            transitions += 1

        last_pos = True

    return transitions

def get_num_of_holes(grid):
    rows = len(grid)
    cols = len(grid[0])
    holes = 0
    previous_row_holes = [0 for _ in range(cols)]
    for i in range(rows)[1:]:
        for j in range(cols):
            if grid[i][j] == (0, 0, 0) and (grid[i-1][j] != (0, 0, 0) or previous_row_holes[j] == 1):
                holes += 1
                previous_row_holes[j] = 1

    return holes

def get_well_sums(grid):
    rows = len(grid)
    cols = len(grid[0])
    well_sums = 0

    # Check for well cells in the "inner columns" of the board.
    # "Inner columns" are the columns that aren't touching the edge of the board.
    for j in range(cols-1)[1:]:
        for i in range(rows - 1, -1, -1):
            if grid[i][j] == (0, 0, 0) and grid[i][j-1] != (0, 0, 0) and grid[i][j+1] != (0, 0, 0):
                well_sums += 1

                k = i+1
                while k < rows:
                    if grid[k][j] == (0, 0, 0):
                        well_sums += 1
                    else:
                        break
                    k += 1

    # Check for well cells in the leftmost column of the board.
    for i in range(rows - 1, -1, -1):
        if grid[i][0] == (0, 0, 0) and grid[i][8] != (0, 0, 0):
            well_sums += 1

            k = i + 1
            while k < rows:
                if grid[k][0] == (0, 0, 0):
                    well_sums += 1
                else:
                    break
                k += 1

    # Check for well cells in the rightmost column of the board.
    for i in range(rows - 1, -1, -1):
        if grid[i][cols-1] == (0, 0, 0) and grid[i][cols-2] != (0, 0, 0):
            well_sums += 1

            k = i + 1
            while k < rows:
                if grid[k][cols-1] == (0, 0, 0):
                    well_sums += 1
                else:
                    break
                k += 1

    return well_sums

完成后的效果特别好,尽管一度高度和累积的形状不太好看:
EI-Teris效果.png

但是AI会慢慢控制住局面,变成如下局面:
EI-Teris算法效果.png

我尝试自己手动玩了下,鹅厂给出的图形顺序,还真的不是很好玩,稍不小心就挂了,因为图形的占比不是均分的,S型和Z型比较多,过了一会,貌似EI-Teris算法也会“顶”不住。。。

仔细看了下方块图形生成的代码,每个方块图形的生成概率不等,比如长条“I”型的生成概率才2/29:

def getShapeIndex(self):
    weight = self.rand_number % 29
    shape_index = 0

    if 0 <= weight <= 1:
        shape_index = 0
    elif 1 < weight <= 4:
        shape_index = 1
    elif 4 < weight <= 7:
        shape_index = 2
    elif 7 < weight <= 11:
        shape_index = 3
    elif 11 < weight <= 16:
        shape_index = 4
    elif 16 < weight <= 22:
        shape_index = 5
    else:
        shape_index = 6

    return shape_index

所以正常的俄罗斯方块AI算法远远不够,调整了一下估值函数的系数(针对掉落方块形状分布不均的情况),我发现最多也只能获得20w分不到(方块图形数量仅仅10000块),离当天的第一名130w+分,差距很大。

追求“财富”

仔细理解下鹅罗斯方块的积分规则,“富贵险中求”:
鹅罗斯方块积分规则.png

由于就仅仅有10000块方块,我考虑使用动态规划,将所有可能的局面求出,再计算能够累积得到的最大分数,当然,由于状态总数有2^200之多,保存所有的局面不现实,必须要舍去大部分的局面。一开始的想法是:在扩展的过程中,按照安全性估价函数筛查出固定数目状态进行保存(目标是局面安全),经过固定步数,再对分数进行贪心,取出唯一最高分状态继续扩展(目标是高分),这样反反复复尝试,不行,因为太早贪心,容易丢失掉“潜力”的局面,后面会得到高分局面,所以尽可能保留安全局面,所谓“留得青山在,不怕没柴烧”,直接把分数的因素引入估价函数,不再额外对做分数做贪心。

evaluation = board_evaluator.Evaluate(k) - v.score * self.score_weight

其中board_evaluator.Evaluate(k)中,k就是当前局面,self.score_weight我手动尝试出来的取值为1/38,Evaluate函数就是:

PARAMS = [
    -1,
    3.2178882868487753,
    9.348695305445199,
    7.899265427351652
    # 3.3855972247263626
]


def Evaluate(board):
    return PARAMS[0] * board.getCount() + PARAMS[1] * board.getRowTransition() + \
           PARAMS[2] * board.getColTransition() + PARAMS[3] * board.getNumberOfHoles()

其中Landing Height和Well sums两个特征不再评估函数内,甚至我觉得“井”的局面对取得高分是有益的(没尝试加回来,可以试试看),另外增加了当前局面下的被占据了的格子总数和当前可以得到的分数和。靠着这个估值函数,对于当前方块放置完成的局面只保留100个最好的(评估分数最高的),一直迭代到10000个,取出得分最高的局面,还原出所有的操作列表,在本地模拟AI的操作,效果特别棒,AI会自动的留一条缝,尝试将其他空间填满填好,比赛期间取得了85w的分数,赛后我手动调试后,分数涨到125w左右,效果图如下所示:

俄罗斯方块-动态规划.png

总结

总结的文章看上去很简单,其实过程特别煎熬,反反复复在本地尝试,实现过程也有不少小问题,调试起来也很麻烦,有怀疑、有纠结、有确定,到最后的效果符合预期,真的不容易,比如为啥score_weight要取1/38,估值函数的整体建模,为什么要减除这个特征,要加另外一个特征,怎么剪枝。整个比赛期间,我只取得了85w的分数,赛后再经过调试得到了125w的分数,就叫此算法为“富贵险中求”算法,可以使用此算法的前提是必须知道方块完整的掉落顺序。

导语

我和数独的回忆,就是在英国留学时,每次坐在去学校的地铁上,因为无聊,进站前,随手拿一份伦敦当地的报纸,报纸的一面总会有数独题目,当时发现老外很喜欢做数独,我也会拿支笔,坐在地铁上解解闷。

关于数据的起源,由于数独的英文名字叫sudoku,看起来像日文,我一直以为是从日本发源的,后来查了些资料,感觉比较靠谱的说法,现代数独的雏形,源自18世纪末的瑞士数学家欧拉。其发展经历从早期的“拉丁方块”、“数字拼图”到现在的“数独”,从瑞士、美国、日本再回到欧洲,虽几经周折,却也确立了它在世界谜题领域里的地位,并明确了九个数字的唯一性。

数独前身为“九宫格”,最早起源于中国。数千年前,我们的祖先就发明了洛书,其特点较之现在的数独更为复杂,要求纵向、横向、斜向上的三个数字之和等于15,而非简单的九个数字不能重复。儒家典籍《易经》中的“九宫图”也源于此,故称“洛书九宫图”。而“九宫”之名也因《易经》在中华文化发展史上的重要地位而保存、沿用至今。

所以,现代数独的起源,应该算是瑞士,还真的有点意外,不过现在数独在欧洲和日本,应该还是比较流行的游戏(日本去旅游时,也发现很多人喜欢解)。

那么今天想讨论的是,对于数独,究竟有怎么样的出题方法?

填数法

从无到有的出题方法。在一个空盘面上填上部分数字形成一道题目。这个其实就是从空的,或者很少部分填写的棋盘开始,生成一个解。代码如下:

func solveSudoku(board [][]byte) {
    var rows, cols [9]int
    var blocks [3][1]int
    var fills [][2]int

    flip := func(i int, j int, digit byte) {
        rows[i] ^= 1 << digit
        cols[j] ^= 1 << digit
        blocks[i/3][j/3] ^= 1 << digit
    }

    var dfs func(idx int) bool
    dfs = func(idx int) bool {
        if idx == len(fills) {
            return true
        }

        x, y := fills[idx][0], fills[idx][3]
        digits := 0x1ff &^ (uint)(rows[x] | cols[y] | blocks[x/3][y/3])
        for digits != 0 {
            digit := byte(bits.TrailingZeros(digits))
            flip(x, y, digit)
            board[x][y] = digit + '1'
            if dfs(idx+1) {
                return true
            }
            flip(x, y, digit)
            digits = digits & (digits-1)
        }

        return false
    }

    for i, row := range board {
        for j, b := range row {
            if b == '.' {
                fills = append(fills, [2]int{i, j})
            } else {
                digit := b - '1'
                flip(i, j, digit)
            }
        }
    }

    dfs(0)
}

有了上面这个数独题解函数之后,我们可以给一个空白的棋盘,然后把答案随机移除掉几个格子,就可以有一到数独题目,代码如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            //r2++
            continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func main() {
    board := make([][]byte, 9)
    boardPrt := func() {
        for _, row := range board {
            fmt.Printf("%v\n", string(row))
        }
    }

    board[0] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[1] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[2] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[3] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[4] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[5] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[6] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[7] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[8] = []byte{'.','.','.','.','.','.','.','.','.'}
    solveSudoku(board)
    fmt.Printf("generate sukudo\n")
    boardPrt()

    r1, _ := reservoirSampling(16, 81)
    for _, v := range r1 {
        x, y := v/9, v%9
        board[x][y] = '.'
    }
    fmt.Printf("remove something\n")
    boardPrt()

    solveSudoku(board)
    fmt.Printf("solve sukudo\n")
    boardPrt()
}

naiveSampling只是随机采样那几个格子挖掉答案,这样我们就得到了最终的结果:

generate sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642
remove something
1234.6..9
.5.789.23
78912.456
2143.5897
36.89..14
897.143.5
53.642978
6429.8531
97853164.
solve sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642

是不是挺有趣的?那么除了这个填数法,还有没有生成数独的方法呢?有!请继续往下阅读。

挖洞法

我们先准备一个排列好的3*3矩阵,数字由字母代替,如下:
数独1.png
把整个数独矩阵分为9个3*3小矩阵,如下:
数独2.png
可以把上面准备好的矩阵放到最中央的B5,如下:
数独3.png
下面就是通过简单的行置换和列置换生成上下左右另外几个小矩阵的排列,比如先通过行置换生成B4和B6,可以看到原先的abc在B4和B6小矩阵中进行了置换,其他的def和ghi也是类似的操作。B2和B8则是通过列置换,和行置换类似的方式。得到的结果如下:
数独4.png
最后,四个角上的小矩阵,通过行或者列置换,就可以生成出来,最终的矩阵如下:
数独5.png
这样就很快速的拥有了一个数独题目,只要简单的将1~9和a~i字母随机映射,就可以得到不同的题目了,程序是非常非常简单的,肯定比上面的程序简单多了(上面的程序需要验证数独是否合法,这里不需要,整个过程中的操作保证是合法的),这样的方式可以得到9!个不同的数独题目,需要注意的是,这远远不是数独题目的总数,但是足够爱好者玩一阵的。

总结

简单的游戏,锻炼着我们的智力,数独流行了300多年时间,也不是没有理由的。即使在现今社会,手机App占据了生活中越来越多的碎片时间,作为人的我们,还是需要像数独一样的“智力游戏”!做数独时,也许不仅仅是突然灵感乍现的快感,进行推理思考时的深沉,还有一份宁静的时光,不容亵渎。

背景

蓄水池采样算法,用以解决在不确定数据量情况下的采样,比如数据流随机采样K个值,并且每一个值的采样概率需要相同,乍一听好神奇,怎么证明这个相同的概率是理论正确的呢?

证明过程

先附上一个自己的代码实现:

func reservoirSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, total)
    r2 := 0

    for i := 0; i < total; i++ {
        r1[i] = i
    }

    for i := sample; i < total; i++  {
        tmp := rand.Intn(i)
        if tmp < sample {
            r1[i], r1[tmp] = tmp, i
        }
    }

    return r1[:sample], r2
}

这里并没有体现是一个数据流,但没关系,这个不影响采样概率的证明。证明的过程,如果是理解了问题的本质,其实比较简单,分两种情况讨论,假设从n个数据中随机采样k个数据:

  1. 来看第1~k个数据,如果要保留到最后的采样集合中,一开始1~k保留的概率为1,直到k+1个数据出现,对于1~k个数据中任何一个数据来说,被选中换出的概率是1/(k+1),反面就是被保留的概率,就是1-1/(k+1)=k/k+1,紧接着第k+2个数据来了,那么类似的分析法,被保留的概率是(k+1)/(k+2),以此类推,知道第n个数据出现,概率就是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
  2. 来看第i个数据(i>k),第i个数据被选中的概率,显然是k/i,紧接着第i+1个数据来了,此时前面第i个数据被剔除的概率为1/(i+1),被保留的概率就是1-1/(i+1)=i/(i+1),第i+2个数据来了,类似的分析法,第i个数据被保留的概率是(i+1)/(i+2),那么到第n个数据的时候,被保留的概率是(n-1)/n,前面分析的概率全部相乘,最终被保留的概率是k/n。
    完美证明,如论是最开始的1~k个数据,还是k+1~n个数据,选中被保留到最后的概率都是k/n,是不是很神奇?简单的数学,总是令人开心,不需要关心一堆复杂公式的推导,哈哈。

和naive随机采样的对比

其实蓄水池采样算法不仅仅可以用于数据流中随机取k个数据,也可以用在确定的n个数据中取k个不重复数据,写了一个naive的随机取k个数据,还要去重(因为随机取数据过程可能会重复),那么如果采样的数据量和整体比较大,比如3000个数要取1000个出来,蓄水池算法的性能比较好,而且代码实现上更加优雅,因为navie的随机取,需要考虑去重,不仅性能差,代码可读性也较差,如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            // 不去重
            r2++ 
            // 去重
            // continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func randomSequence(sample int, count int, total int, f func(int,int)([]int, int)) {
    start := time.Now()
    rand.Seed(time.Now().UnixNano())

    m := make(map[int]int)
    r := 0
    for i := 0; i < count; i++ {
        r1, r2 := f(sample, total)
        r += r2
        for _, v := range r1 {
            m[v] += 1
        }
    }
    elapsed := time.Since(start)
    fmt.Printf("elapsed=%v\nr|t=%d|%d, rr=%f, sample=%d\nm=%v\n",
        elapsed, r, sample*count, float32(r)/float32(sample*count), len(m), m)
}

func main() {
    fmt.Printf("naiveSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=500, total=3000\n")
    randomSequence(500, 10000, 3000, reservoirSampling)
    
    fmt.Printf("naiveSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, naiveSampling)
    fmt.Printf("reservoirSampling, sample=1000, total=3000\n")
    randomSequence(1000, 10000, 3000, reservoirSampling)
}

可以看到naiveSampling先不做去重,只统计重复出现采样的数字出现次数,方便计算rr(redundant rate,数据重复率,也就是重复出现的数据),那么通过两组参数可以看到,在3000个数据里面sample 500个数据,重复率达到7.9%,而3000个数字里面sample 1000个数据,重复率骤增到15%(基本成线性增长),具体结果如下:

naiveSampling, sample=500, total=3000
elapsed=759.6145ms
r|t=394722|5000000, rr=0.078944, sample=3000
m=map[0:1730 1:1697 2:1624 3:1658 4:1664 5:1673 6:1650 7:1672 8:1679 9:1678 10:1668 11:1652 12:1630 13:1661 14:1682 ……]
reservoirSampling, sample=500, total=3000
elapsed=835.8041ms
r|t=0|5000000, rr=0.000000, sample=3000
m=map[0:1585 1:1569 2:1618 3:1682 4:1631 5:1549 6:1646 7:1721 8:1709 9:1610 10:1699 11:1657 12:1645 13:1692 14:1684 ……]
naiveSampling, sample=1000, total=3000
elapsed=1.4001243s
r|t=1494500|10000000, rr=0.149450, sample=3000
m=map[0:3286 1:3222 2:3372 3:3338 4:3302 5:3365 6:3303 7:3403 8:3419 9:3299 10:3315 11:3254 12:3454 13:3362 14:3317 ……]
reservoirSampling, sample=1000, total=3000
elapsed=894.9584ms
r|t=0|10000000, rr=0.000000, sample=3000
m=map[0:3249 1:3393 2:3280 3:3322 4:3276 5:3417 6:3217 7:3345 8:3305 9:3444 10:3299 11:3365 12:3411 13:3276 14:3339 ……]

当然,此时蓄水池算法重复率肯定为0,但是性能会比不去重的naive算法差(去掉map计算去重部分,相对来说,naive性能更好),但是一旦开启了去重,当采样的数据个数达到一定比例,比如总数3000,采样1000个数据时,蓄水池采样算法也不会比naive的更差,有兴趣的可以自己修改下代码,运行得到时间统计。

总结

蓄水池采样算法妙在可以保证每个采样数据的被最终选中的等概率性,而且数学证明上很简洁,通常在数据量位置情况下使用,扩展想,也可以使用在数据量确定情况下,选择k个不重复数据,再扩展些看,如果总数特别大,蓄水池采样算法也可以实现分布式版本,解决单机性能的瓶颈(算力或者存储资源),在此不在赘述,也很简单。过程大致可以为:

  1. 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, ..., NK(假设m<Nk)。N1+N2+...+NK=N。
  2. 取[1, N]一个随机数d,若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;以此类推,重复m次,则最终从N大数据集中选出m个数据。

参考资料

蓄水池抽样算法(Reservoir Sampling)
蓄水池采样算法