分类 默认分类 下的文章

导语

作为80后,小时候一定玩过一款经典游戏,俄罗斯方块,英文名称Tetris。玩的画面在脑海中已是“上古”时期的事情,但也并不陌生,因为刷视频时,偶尔还能看到顶尖世界高手对决,他们玩的俄罗斯方块难度极大,游戏模式是对决,比谁的分数高,有些比赛到最后不仅自己消砖块得分,而且还可以通过得分不停给对手制造麻烦。
俄罗斯方块世界比赛.png

看到鹅厂组织的一次极客挑战赛,主题就是俄罗斯方块游戏,当然游戏名称叫“鹅”罗斯方块,蛮鹅厂风格的,可可爱爱。

鹅罗斯方块挑战赛.png
我比较感兴趣,当时也报名参加了,最后是在2000+入场选手排名在9位,比赛结束后,我又花了点时间调试了自己的算法参数,最终比赛分数可以刷进TOP3。谨以此文,记录自己的所思所想。

如何活着

第一步想到的是,如何“活着”,就和当前经济形势下,各行各业都得考虑先“活着”,能不能得到高分其次,先得“不死”。
俄罗斯方块游戏.png

我自己写了一个俄罗斯方块的游戏界面,和传统俄罗斯方块游戏一样,如何拼搭这些小图形,让高度不超过游戏规定的高度,满行就会消除,消除的行数有4种情况1行、2行、3行和4行。针对如何活着的俄罗斯方块AI算法比较多了,全网比较有名的是Pierre Dellacherie算法。我这里搜索到一篇El-Tetris – An Improvement on Pierre Dellacherie’s Algorithm,大家可以参考下。

首先,分析下俄罗斯方块中方块的种类,一共有7种图形:
俄罗斯方块形状.png

上面图中有7种不同的形状,我在代码里面的表示为坐标法,以(0, 0)为原点,标示周围那几个点对应图形需要占用。

BrickShapes = [
    # I
    [[[-2, 0], [-1, 0], [0, 0], [1, 0]],
     [[0, -1], [0, 0], [0, 1], [0, 2]]],

    # L
    [[[-2, 0], [-1, 0], [0, 0], [0, 1]],
     [[0, 0], [0, 1], [0, 2], [1, 0]],
     [[0, -1], [0, 0], [1, 0], [2, 0]],
     [[-1, 0], [0, -2], [0, -1], [0, 0]]],

    # J
    [[[-2, 0], [-1, 0], [0, -1], [0, 0]],
     [[-1, 0], [0, 0], [0, 1], [0, 2]],
     [[0, 0], [0, 1], [1, 0], [2, 0]],
     [[0, -2], [0, -1], [0, 0], [1, 0]]],

    # T
    [[[0, -1], [0, 0], [0, 1], [1, 0]],
     [[-1, 0], [0, -1], [0, 0], [1, 0]],
     [[-1, 0], [0, -1], [0, 0], [0, 1]],
     [[-1, 0], [0, 0], [0, 1], [1, 0]]],

    # O
    [[[-1, 0], [-1, 1], [0, 0], [0, 1]]],

    # S
    [[[-1, 0], [-1, 1], [0, -1], [0, 0]],
     [[-1, -1], [0, -1], [0, 0], [1, 0]]],

    # Z
    [[[-1, -1], [-1, 0], [0, 0], [0, 1]],
     [[-1, 0], [0, -1], [0, 0], [1, -1]]]
]

比如竖棍“I”的图形,有两种rotation,所以有两组坐标的标示方式,其他图形类似,只不过rotation数量不同:

    # I, two rotations
    [[[-2, 0], [-1, 0], [0, 0], [1, 0]],
     [[0, -1], [0, 0], [0, 1], [0, 2]]],

El-Tetris算法是传统AI人工智能,不要因为人工智能,就觉得很复杂,其实不论传统的AI算法还是目前深度学习的人工算法,一般都不会很复杂,深度学习AI算法不在此文讨论范围。基本上传统AI算法就是列举所有的可能性,通过计算预先建模的公式,评估最优秀的一个局面,做出选择,一直循环往复,直到游戏结束。我们看下El-Tetris算法的公式:
f(x) = b0x0+b1x1+b2x2...bnxn
其中x0,x1,x2...xn都是一些特征,b0,b1,b2...bn是一些系数(通过数学方法或者机器学习方法等手段推算出的经验值),对应特征和系数相乘相加之后,得到的就是当前局面+此选择的局面评估分数,是不是很简单,看上去是一个线性公式,但是效果出奇的好,原因还是在于每一个特征x的合理性以及系数b的合理性。

系数b不需要我们实现,就是一组常数字,作者说是通过particle swarm optimization寻找到的,具体怎么找,我也不太清楚,先拿来用吧。

另外要提一下,算法可以通常可以考虑当前图形+下一个图形,也可以只考虑当前图形,据说效果差不多,我这里只考虑当前图形的所有可能的操作,每一个图形出来,其实只需要考虑图形放置的position和rotation两个变量。所以整体的流程:

while the game is not over:
  examine piece given
  find the best move possible (an rotation and a position)
  play piece
repeat

上面铺垫这么多,那么一共建模了几种特征呢?一共是6种,如下:

  • Landing Height: The height where the piece is put (= the height of the column + (the height of the piece / 2)).
  • Rows eliminated: The number of rows eliminated.
  • Row Transitions: The total number of row transitions. A row transition occurs when an empty cell is adjacent to a filled cell on the same row and vice versa.
  • Column Transitions: The total number of column transitions. A column transition occurs when an empty cell is adjacent to a filled cell on the same column and vice versa.
  • Number of Holes: A hole is an empty cell that has at least one filled cell above it in the same column.
  • Well Sums: A well is a succession of empty cells such that their left cells and right cells are both filled.

Landing Height和Rows eliminated比较好理解,其他对着下图解释下:
特征计算.png

Row Transitions总数是16,最底部一行为2,依次往上2、2、2、2、4、2,注意我这里处理为左右边界外是被占据的,所以倒数第四行靠最左边有个空,算存在一次row transition,另外空行我没算(我猜测也可以计算row transition,不影响最后结果)。
Column Transitions总数是26,如果理解了Row Transitions,这个计算方法是一样的。
Number of Holes总数是4,看下上图,应该知道什么叫“洞”,被封闭了的空单元格。
Well Sums总数是5,如果左右两边单元格被占,顶上又没有被封闭,可以算作一个“井”格子,第一列一个单元格,左边界外算占据,右边也被占据,所以也算作“井”格子,深度为1,第三列就一个单元格,井深度为1,第六列有2个空单元格,井深度和为1+1+1+2。这个比较难理解,多看几遍。

对应的系数如下:
Landing Height = -4.500158825082766
Rows eliminated = 3.4181268101392694
Row Transitions = -3.2178882868487753
Column Transitions = -9.348695305445199
Number of Holes = -7.899265427351652
Well Sums = -3.3855972247263626

计算代码如下:

def find_best_play(current_piece, grid):
    if current_piece.y < 0:
        False, 0, 0

    rotations = len(current_piece.shape)
    x_moves = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]
    rotation = 0
    x_move = 0
    best_play_score = float("-inf")
    for i in range(rotations):
        for j in x_moves:
            candidate_piece = copy.deepcopy(current_piece)
            candidate_grid = copy.deepcopy(grid)
            candidate_piece.rotation += i
            candidate_piece.x += j

            if not (valid_space(candidate_piece, candidate_grid)):
                continue

            valid, score = evaluate(candidate_piece, candidate_grid)
            if valid and score > best_play_score:
                best_play_score = score
                rotation = i
                x_move = j

    return True, rotation, x_move


def evaluate(current_piece, grid):
    while True:
        current_piece.y += 1
        if not (valid_space(current_piece, grid)) and current_piece.y > 0:
            current_piece.y -= 1
            break

    shape_pos = convert_shape_format(current_piece)

    for i in range(len(shape_pos)):
        x, y = shape_pos[i]
        if y > -1:
            grid[y][x] = current_piece.color

    heights_points = (20 - current_piece.y + current_piece.height[current_piece.rotation % len(current_piece.height)]) * float(-4.500158825082766)
    rows_removed = get_rows_removed(grid) * float(6.4181268101392694)
    rows_transitions = get_row_transitions(grid) * float(-3.2178882868487753)
    cols_transitions = get_col_transitions(grid) * float(-9.348695305445199)
    num_of_holes = get_num_of_holes(grid) * float(-14.899265427351652)
    wells = get_well_sums(grid) * float(-6.3855972247263626)

    return True, heights_points + rows_removed + rows_transitions + cols_transitions + num_of_holes + wells

find_best_play列举所有可能的操作,evaluate就是评估局面的分数,其中几个计算特征值的函数:

def get_row_transitions(grid):
    rows = len(grid)
    cols = len(grid[0])
    transitions = 0
    last_pos = True
    for i in range(rows - 1, -1, -1):
        if is_empty_row(grid[i]):
            continue

        for j in range(cols):
            if (grid[i][j] in brick.SHAPES_COLORS) != last_pos:
                transitions += 1

            last_pos = (grid[i][j] in brick.SHAPES_COLORS)

        if not last_pos:
            transitions += 1

        last_pos = True

    return transitions

def get_col_transitions(grid):
    rows = len(grid)
    cols = len(grid[0])
    transitions = 0
    last_pos = True
    for j in range(cols):
        for i in range(rows - 1, -1, -1):
            if (grid[i][j] in brick.SHAPES_COLORS) != last_pos:
                transitions += 1

            last_pos = (grid[i][j] in brick.SHAPES_COLORS)

        if not last_pos:
            transitions += 1

        last_pos = True

    return transitions

def get_num_of_holes(grid):
    rows = len(grid)
    cols = len(grid[0])
    holes = 0
    previous_row_holes = [0 for _ in range(cols)]
    for i in range(rows)[1:]:
        for j in range(cols):
            if grid[i][j] == (0, 0, 0) and (grid[i-1][j] != (0, 0, 0) or previous_row_holes[j] == 1):
                holes += 1
                previous_row_holes[j] = 1

    return holes

def get_well_sums(grid):
    rows = len(grid)
    cols = len(grid[0])
    well_sums = 0

    # Check for well cells in the "inner columns" of the board.
    # "Inner columns" are the columns that aren't touching the edge of the board.
    for j in range(cols-1)[1:]:
        for i in range(rows - 1, -1, -1):
            if grid[i][j] == (0, 0, 0) and grid[i][j-1] != (0, 0, 0) and grid[i][j+1] != (0, 0, 0):
                well_sums += 1

                k = i+1
                while k < rows:
                    if grid[k][j] == (0, 0, 0):
                        well_sums += 1
                    else:
                        break
                    k += 1

    # Check for well cells in the leftmost column of the board.
    for i in range(rows - 1, -1, -1):
        if grid[i][0] == (0, 0, 0) and grid[i][8] != (0, 0, 0):
            well_sums += 1

            k = i + 1
            while k < rows:
                if grid[k][0] == (0, 0, 0):
                    well_sums += 1
                else:
                    break
                k += 1

    # Check for well cells in the rightmost column of the board.
    for i in range(rows - 1, -1, -1):
        if grid[i][cols-1] == (0, 0, 0) and grid[i][cols-2] != (0, 0, 0):
            well_sums += 1

            k = i + 1
            while k < rows:
                if grid[k][cols-1] == (0, 0, 0):
                    well_sums += 1
                else:
                    break
                k += 1

    return well_sums

完成后的效果特别好,尽管一度高度和累积的形状不太好看:
EI-Teris效果.png

但是AI会慢慢控制住局面,变成如下局面:
EI-Teris算法效果.png

我尝试自己手动玩了下,鹅厂给出的图形顺序,还真的不是很好玩,稍不小心就挂了,因为图形的占比不是均分的,S型和Z型比较多,过了一会,貌似EI-Teris算法也会“顶”不住。。。

仔细看了下方块图形生成的代码,每个方块图形的生成概率不等,比如长条“I”型的生成概率才2/29:

def getShapeIndex(self):
    weight = self.rand_number % 29
    shape_index = 0

    if 0 <= weight <= 1:
        shape_index = 0
    elif 1 < weight <= 4:
        shape_index = 1
    elif 4 < weight <= 7:
        shape_index = 2
    elif 7 < weight <= 11:
        shape_index = 3
    elif 11 < weight <= 16:
        shape_index = 4
    elif 16 < weight <= 22:
        shape_index = 5
    else:
        shape_index = 6

    return shape_index

所以正常的俄罗斯方块AI算法远远不够,调整了一下估值函数的系数(针对掉落方块形状分布不均的情况),我发现最多也只能获得20w分不到(方块图形数量仅仅10000块),离当天的第一名130w+分,差距很大。

追求“财富”

仔细理解下鹅罗斯方块的积分规则,“富贵险中求”:
鹅罗斯方块积分规则.png

由于就仅仅有10000块方块,我考虑使用动态规划,将所有可能的局面求出,再计算能够累积得到的最大分数,当然,由于状态总数有2^200之多,保存所有的局面不现实,必须要舍去大部分的局面。一开始的想法是:在扩展的过程中,按照安全性估价函数筛查出固定数目状态进行保存(目标是局面安全),经过固定步数,再对分数进行贪心,取出唯一最高分状态继续扩展(目标是高分),这样反反复复尝试,不行,因为太早贪心,容易丢失掉“潜力”的局面,后面会得到高分局面,所以尽可能保留安全局面,所谓“留得青山在,不怕没柴烧”,直接把分数的因素引入估价函数,不再额外对做分数做贪心。

evaluation = board_evaluator.Evaluate(k) - v.score * self.score_weight

其中board_evaluator.Evaluate(k)中,k就是当前局面,self.score_weight我手动尝试出来的取值为1/38,Evaluate函数就是:

PARAMS = [
    -1,
    3.2178882868487753,
    9.348695305445199,
    7.899265427351652
    # 3.3855972247263626
]


def Evaluate(board):
    return PARAMS[0] * board.getCount() + PARAMS[1] * board.getRowTransition() + \
           PARAMS[2] * board.getColTransition() + PARAMS[3] * board.getNumberOfHoles()

其中Landing Height和Well sums两个特征不再评估函数内,甚至我觉得“井”的局面对取得高分是有益的(没尝试加回来,可以试试看),另外增加了当前局面下的被占据了的格子总数和当前可以得到的分数和。靠着这个估值函数,对于当前方块放置完成的局面只保留100个最好的(评估分数最高的),一直迭代到10000个,取出得分最高的局面,还原出所有的操作列表,在本地模拟AI的操作,效果特别棒,AI会自动的留一条缝,尝试将其他空间填满填好,比赛期间取得了85w的分数,赛后我手动调试后,分数涨到125w左右,效果图如下所示:

俄罗斯方块-动态规划.png

总结

总结的文章看上去很简单,其实过程特别煎熬,反反复复在本地尝试,实现过程也有不少小问题,调试起来也很麻烦,有怀疑、有纠结、有确定,到最后的效果符合预期,真的不容易,比如为啥score_weight要取1/38,估值函数的整体建模,为什么要减除这个特征,要加另外一个特征,怎么剪枝。整个比赛期间,我只取得了85w的分数,赛后再经过调试得到了125w的分数,就叫此算法为“富贵险中求”算法,可以使用此算法的前提是必须知道方块完整的掉落顺序。

上海疫情封在家中有三个月,辅导了小孩子功课,每每小孩遇到不会做的题目,比她更着急。有时也会想起自己学生时代的样子,遇到难题,心中升起必须克服难题的决心,现在自己娃,怎么没有我当年的风采了。哈哈,扯远了,这两天网上看到一道小学二年级的题目,其实我之前做过类似的题目,但是当下还是震惊了,怎么现在小学二年级题目有这么难?

题目是这样的:
赛车比赛题目.jpeg

配合网友的回复,我马上想到了之前类似的问题,说是25匹马,5个赛道,至少比几场保证找出最快的2匹马。赛马这题答案我记得很清楚7场比赛,先分组进行小组赛:

赛马比赛分组.png

首先一个虚线框就是一场比赛,上来25匹马,分5组,进行5场预赛。5组比赛会得到组内的名次,比如A组,A1->A2->A3->A4->A5,每组第一名就是A1,B1,C1,D1和E1,然后第一名之间安排一场决赛:
赛马比赛决赛.png

假设结果就是A1->B1->C1->D1->E1(真正结果不重要,不管怎么样,总能推理出最后的结论),这时候我们就通过4场比赛,知道了第一名是A1,那第二名是谁呢,是不是B1,不一定哦,有可能是在小组赛中被A1淘汰的A2,所以安排场附加赛吧,A2 vs B1:
赛马附加赛.png

附加赛之后,就能知道前两名了。话说回小学二年级赛车这道题目,其实思路一模一样,就是通过小组赛->决赛->附加赛,完成前两名的确认,也就是说赛车的话,通过至少5场比赛,保证找出前两名的赛车。

作为小学二年级的题目很变态了吧,但是出题的人还多问了一句,怎么证明这件事情呢,为什么通过4场比赛一定不能保证找出前两名呢?这个证明难倒我了,抓耳挠腮半天,没想出来。然后我就看了下出题老师的分析,茅塞顿开,原来还可以使用图论+反正法来证明。大致的过程如下:
首先把每一辆赛车想象成点,那么我们一共有9辆赛车,一共9个点,每场比赛,只能进行3辆赛车的比拼,每次比拼完,我们会有一个名次,紧挨着的前后两名之间,使用一条有向边连接,这个我文字讲得有点绕口,一图解千愁:
赛车比赛顺序图.png

圆圈表示赛车,1号比2号快,2号比3号快,所以每次比赛,我们仅仅能得到2条有向边,如果进行4场比赛,那么总共的有向边数量是4*2 = 8条,而赛车的数量是9,也就是节点数量是9,8条边,9个点,如果要保证所有边都连接到一张图中,不能出现环,否则边的数量不够。
环.png

出现环就意味着9个点形成了若干个分开的子图,上面图中9个节点,分为了两个子图,那么就会有一个问题,第一名都不知道是谁,为什么?因为在1~8节点所在的图中,1号赛车最快,9号赛车单独成子图,9号赛车和谁都没有比过,那么到底9号和1号赛车谁快呢?不知道。所以8条边,安排的比赛结果,划分为多个子图,这样是无法确认前两名的。

那我们4场比赛的结果,如何是可以前两名的图形呢,如下:
树形图.png

左边就是分3场比赛(1、4、7,2、5、8和3、6、9分别比赛),得到结果后,4、2、3比一次,结果是4->2->3,转化这张单图,得到右侧的结果,其实就是一个树形的图,那么前两名就是1、4号赛车。

这样我们很容易举出反例,你不是就8条边么,我最后4、2、3的结果是2->4->3,图就变成非树形图,如下:
多个根结点图.png

这样又回到之前多个子图的问题,我们是没法确认1号赛车和2号赛车谁快的,也就没法真正确认前两名是哪两辆赛车。

OK,至少5场比赛,而非4场比赛保证找出最快的2辆赛车得证。是不是挺难的,至少我是没有想出来,通过图论+反证法证明这个数据结论,由衷发出感慨,活学活用好难啊,能做到的大佬,真的太强了。
373d2c7114b84ac57e8b9e3ad3eb78fe.jpeg

导语

最近因为上海的疫情,被关在家里,有点闷坏了,小区里的快递或者外卖,也不允许下楼取,由小区保安统一配送,保安还需要定时去巡查,看看小区有没有违规下楼,小区有没有人在随意走动,我们小区有8栋楼,虽然不大,但楼宇间有绿化带阻隔,加上小区保安人手有限,怎样合理规划路径(保证所有路径都经过,又不重复走),倒是一个有趣的图论问题。我突然想到了哥尼斯堡七桥经典问题,相传18世纪初普鲁士的哥尼斯堡,有一条河穿过,河上有两个小岛,有七座桥把两个岛与河岸联系起来(如下图)。有个人提出一个问题:一个步行者怎样才能不重复、不遗漏地一次走完七座桥,最后回到出发点。
哥德堡七桥问题.png

自己画的图有点丑,但应该比较好理解,A、D是两个小岛,B、C是两边的陆地。

哥尼斯堡的七座桥

1736年29岁的欧拉向圣彼得堡科学院递交了《哥尼斯堡的七座桥》的论文,在解答问题的同时,开创了数学的一个新的分支——图论与几何拓扑,也由此展开了数学史上的新历程。欧拉的名声如雷贯耳,往往成功的大师,都是有心人,对生活中遇到的问题,愿意进行深入的分析和思考。在论文中,欧拉对哥尼斯堡的七座桥问题进行了抽象与转换,他认为岛和两边的河岸都是点,桥就是连接点的边,那么能否不重复地走过所有的桥,又不重复,就转换为能否一笔画出对应的图(点和边组成的图),并且没有重复的笔画。

首先,我们来了解下关键的一个概念:度数,即每个点连接的边数量。那么可以把点分为:

  • 奇点:点的度数为奇数的点,即连接此点的边数量为奇数;
  • 偶点:点的度数为偶数的点,即连接此点的边数量为偶数;

我们使用一笔画出一个图来,如下图所示,红色的点就是偶点,蓝色的点就是奇点,落笔的点就是奇点之一,另外一个奇点就是收笔的点。中间的所有点都是偶点,一进一出,必然度数是2、4、6、8等情况。而奇点有2个,分别是落笔的点和收笔的点,不可能出现第三个奇点。

奇点偶点.png

如果把落笔点和收笔点连接起来,理解为两个奇点连接起来,那么图中所有的点都是偶点。如下图所示:
奇点偶点2.png

在1736年,大数学家欧拉提出了一个图如果一笔不重复地画出来,需要的充要条件是,该图只有0个或者2个奇点,如果存在0个奇点,图中存在欧拉回路,如果存在2个奇点,图中存在欧拉路径。回到哥尼斯堡七桥问题,抽象出来的图如下:
哥尼斯堡七桥问题2.png

可以看到4个点都是奇点,所以欧拉说,不可能做到不重复、不遗漏地一次走完七座桥。既然这个图不能一笔画出来,那么究竟几笔能画出来,欧拉也给了答案。图的奇点总是成对出现的,因为有一个落笔点,必然会有一个收笔点,如果一张图有2k(k >= 1)个奇点,那么欧拉认为有以下3个推论:

  • 此图可以通过k笔画出
  • 添加k-1条边,保证存在欧拉路径
  • 添加k条边,保证存在欧拉回路

回到我们小区送东西的场景,简化为下图:
小区楼宇图.png

一共8栋楼,如果小区保安走一遍,保证每条路径都要走到,又不能重复,存在这样的路径,比如:
小区楼宇图2.png

很幸运,我们小区还是有这样一条路径,可以保证保安巡查时不走重复路的,但又走遍了所有的角落。有些实际生活中的场景,也适合使用欧拉回路来规划,比如在旅游景区的线路规划(游客都不希望走回头路,但又希望每一条路都走一遍),也比较适合。
旅游景点地图.png
如上图,怎么添加线路,使得可以保证旅游景区存在欧拉回路,是不是特别有意思和实用?

结束语

图论中有很多有趣的问题,显然300多年前,欧拉打开了一扇图论的大门,还有一些问题,比如需要走遍图中所有点一次(但不能重复经过点),边不需要都走,有没有合适的判定方法或者算法,显然欧拉回路或者路径是不适用的,这个也有相应的图论算法,叫哈密尔顿回路/路径,单独写文章介绍,在此不详述了。

导语

作为程序员,时常在一些技术书中看到这些UML类图表示,大部分能看懂意思,但那些箭头和标记很容易混淆。其实UML类图是一门精准的语言,每个符号都是有着精确的程序语义的,在这篇文章中,通过自己整理的一个例子,举例说明,帮助自己加深理解。

UML类图例子

其实多看多用就熟悉了,举一个例子,来看这样一幅图,其中就包括了常见UML类图的基本图示法。
UML类图.png
首先看“交通工具”矩形框,它就代表一个类(Class)。类图分三层,第一层显示类的名称,如果是抽象类,则就用斜体显示。第二层是类的特性,通常就是字段和属性。第三层是类的操作,通常是方法或行为。注意前面的符号,‘+’表示public,‘-’表示private,‘#’表示protected。
然后注意左下角的“翻越”,它表示一个接口图,与类图的区别主要是顶端有<<interface>>显示。第一行是接口名称,第二行是接口方法。接口还有另一种表示方法,俗称棒棒糖表示法,比如图中的警车类就是实现了“鸣笛”的接口。

interface ICross {
    // 翻越
    void Cross();
}

interface IWhistle {
    // 鸣笛
    void Whistle();
}

接下来就可以看下类与类,类与接口之间的关系了。可以观察交通工具、车、轿车、卡车、越野车和警车之间的关系,他们都是继承关系,用空心三角形加上实线来表示。例子中的几种车中,越野车是可以翻越小山丘的,我让它实现了翻越接口,实现接口就用空心三角形加上虚线来表示。、

再看卡车和法规(这个例子可能不是特别恰当,但是我想不当更合适的),卡车一般都是运送货物,需要遵守政府的货运相关的法规,限重多少,和卡车能否上路行驶息息相关,卡车司机需要“知道”政府法规,当一个类‘知道’另一个类时,可以用关联(association)。关联关系用实线箭头来表示。

越野车队和越野车这两个类,生活中,开越野车的车友喜欢加入俱乐部,参加越野车活动,一个车队可以有多辆车,所以它们之间就满足聚合(Aggregation)关系。聚合表示一种弱的‘拥有’关系,体现的是A对象可以包含B对象,但B对象不是A对象的一部分。聚合关系用空心的菱形+实线箭头来表示。

合成(Composition,也有翻译成‘组合’的)是一种强的‘拥有’关系,体现了严格的部分和整体的关系,部分和整体的生命周期一样,在这里车和轮胎就是合成(组合)关系,因为它们是部分和整体的关系,并且轮胎和车的生命周期是相同的。合成关系用实心的菱形+实线箭头来表示。另外,你会注意到合成关系的连线两端还有一个数字‘1’和数字‘2’,这被称为基数。表明这一端的类可以有几个实例,很显然,一辆车应该有5个轮胎(算上备胎)。如果一个类可能有无数个实例,则就用‘n’来表示。关联关系、聚合关系也可以有基数的。

交通工具的几大特征,比如类型,路上、水上还是空中的交通工具,交通工具都需要燃料驱动,需要机油保养,他们之间是依赖关系(Dependency),用虚线箭头来表示。

abstract class Vehicle {
   // 增加能源
   void Fill(Energy energy);
   // 保养
   void Maintain(EngineOil oil);
}

结论

编程是一门技术,更加是一门艺术,不能只满足于写完代码运行结果正确就完事,时常考虑如何让代码更加简练,更加容易维护,容易扩展和复用,只有这样才可以真正得到提高。写出优雅的代码真的是一种很爽的事情。UML类图也不是一学就会的,需要有一个慢慢熟练的过程。所谓学无止境,其实这才是理解面向对象的开始呢。

导语

函数式编程的中非常重要的Map、Reduce、Filter的三种操作,这三种操作可以让我们非常方便灵活地进行一些数据处理——我们的程序中大多数情况下都是在到倒腾数据,尤其对于一些需要统计的业务场景,Map/Reduce/Filter是非常通用的玩法。

例子

Map示例
下面的程序代码中,我们写了两个Map函数,这两个函数需要两个参数:

  • 一个是字符串数组 []string,说明需要处理的数据一个字符串
  • 另一个是一个函数func(s string) string 或 func(s string) int
func MapStrToStr(arr []string, fn func(s string) string) []string {
    var newArray = []string{}
    for _, it := range arr {
        newArray = append(newArray, fn(it))
    }
    return newArray
}

func MapStrToInt(arr []string, fn func(s string) int) []int {
    var newArray = []int{}
    for _, it := range arr {
        newArray = append(newArray, fn(it))
    }
    return newArray
}

整个Map函数运行逻辑都很相似,函数体都是在遍历第一个参数的数组,然后,调用第二个参数的函数,然后把其值组合成另一个数组返回。
于是我们就可以这样使用这两个函数:

var list = []string{"Hao", "Chen", "MegaEase"}

x := MapStrToStr(list, func(s string) string {
    return strings.ToUpper(s)
})
fmt.Printf("%v\n", x)
//["HAO", "CHEN", "MEGAEASE"]

y := MapStrToInt(list, func(s string) int {
    return len(s)
})
fmt.Printf("%v\n", y)
//[3, 4, 8]

我们可以看到,我们给第一个 MapStrToStr() 传了函数做的是 转大写,于是出来的数组就成了全大写的,给MapStrToInt() 传的是算其长度,所以出来的数组是每个字符串的长度。
我们再来看一下Reduce和Filter的函数是什么样的。

Reduce示例

func Filter(arr []int, fn func(n int) bool) []int {
    var newArray = []int{}
    for _, it := range arr {
        if fn(it) {
            newArray = append(newArray, it)
        }
    }
    return newArray
}

var intset = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
out := Filter(intset, func(n int) bool {
   return n%2 == 1
})
fmt.Printf("%v\n", out)

out = Filter(intset, func(n int) bool {
    return n > 5
})
fmt.Printf("%v\n", out)

Filter示例

func Filter(arr []int, fn func(n int) bool) []int {
    var newArray = []int{}
    for _, it := range arr {
        if fn(it) {
            newArray = append(newArray, it)
        }
    }
    return newArray
}

var intset = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
out := Filter(intset, func(n int) bool {
   return n%2 == 1
})
fmt.Printf("%v\n", out)

out = Filter(intset, func(n int) bool {
    return n > 5
})
fmt.Printf("%v\n", out)

下图是一个比喻,其非常形象地说明了Map-Reduce是的业务语义,其在数据处理中非常有用。
map-reduce例子.png

业务示例
通过上面的一些示例,你可能有一些明白,Map/Reduce/Filter只是一种控制逻辑,真正的业务逻辑是在传给他们的数据和那个函数来定义的。是的,这是一个很经典的“业务逻辑”和“控制逻辑”分离解耦的编程模式。下面我们来看一个有业务意义的代码,来让大家强化理解一下什么叫“控制逻辑”与业务逻辑分离。

首先,我们一个员工对象,以及一些数据。

type Employee struct {
    Name     string
    Age      int
    Vacation int
    Salary   int
}

var list = []Employee{
    {"Hao", 44, 0, 8000},
    {"Bob", 34, 10, 5000},
    {"Alice", 23, 5, 9000},
    {"Jack", 26, 0, 4000},
    {"Tom", 48, 9, 7500},
    {"Marry", 29, 0, 6000},
    {"Mike", 32, 8, 4000},
}

相关的Reduce/Fitler函数:

func EmployeeCountIf(list []Employee, fn func(e *Employee) bool) int {
    count := 0
    for i, _ := range list {
        if fn(&list[i]) {
            count += 1
        }
    }
    return count
}

func EmployeeFilterIn(list []Employee, fn func(e *Employee) bool) []Employee {
    var newList []Employee
    for i, _ := range list {
        if fn(&list[i]) {
            newList = append(newList, list[i])
        }
    }
    return newList
}

func EmployeeSumIf(list []Employee, fn func(e *Employee) int) int {
    var sum = 0
    for i, _ := range list {
        sum += fn(&list[i])
    }
    return sum
}

简单说明一下:

  • EmployeeConutIf 和 EmployeeSumIf 分别用于统满足某个条件的个数或总数。它们都是Filter +
    Reduce的语义。
  • EmployeeFilterIn 就是按某种条件过虑。就是Fitler的语义。

于是我们就可以有如下的代码。

统计有多少员工大于40岁

old := EmployeeCountIf(list, func(e *Employee) bool {
    return e.Age > 40
})
fmt.Printf("old people: %d\n", old)

统计有多少员工薪水大于6000

high_pay := EmployeeCountIf(list, func(e *Employee) bool {
    return e.Salary >= 6000
})
fmt.Printf("High Salary people: %d\n", high_pay)
//High Salary people: 4

列出有没有休假的员工

no_vacation := EmployeeFilterIn(list, func(e *Employee) bool {
    return e.Vacation == 0
})
fmt.Printf("People no vacation: %v\n", no_vacation)
//People no vacation: [{Hao 44 0 8000} {Jack 26 0 4000} {Marry 29 0 6000}]

统计所有员工的薪资总和

total_pay := EmployeeSumIf(list, func(e *Employee) int {
    return e.Salary
})

fmt.Printf("Total Salary: %d\n", total_pay)
//Total Salary: 43500

统计30岁以下员工的薪资总和

younger_pay := EmployeeSumIf(list, func(e *Employee) int {
if e.Age < 30 {
    return e.Salary
} 
return 0 })

泛型Map-Reduce

我们可以看到,上面的Map-Reduce都因为要处理数据的类型不同而需要写出不同版本的Map-Reduce,虽然他们的代码看上去是很类似的。所以,这里就要带出来泛型编程了,Go语言在本文写作的时候还不支持泛型(注:Go开发团队技术负责人Russ Cox在2012年11月21golang-dev上的mail确认了Go泛型(type parameter)将在Go 1.18版本落地,即2022.2月份)。

简单版 Generic Map
所以,目前的Go语言的泛型只能用 interface{} + reflect来完成,interface{} 可以理解为C中的 void*,Java中的 Object ,reflect是Go的反射机制包,用于在运行时检查类型。
下面我们来看一下一个非常简单不作任何类型检查的泛型的Map函数怎么写。

func Map(data interface{}, fn interface{}) []interface{} {
    vfn := reflect.ValueOf(fn)
    vdata := reflect.ValueOf(data)
    result := make([]interface{}, vdata.Len())

    for i := 0; i < vdata.Len(); i++ {
        result[i] = vfn.Call([]reflect.Value{vdata.Index(i)})[0].Interface()
    }
    return result
}

上面的代码中

  • 通过 reflect.ValueOf() 来获得 interface{} 的值,其中一个是数据 vdata,另一个是函数 vfn;
  • 然后通过 vfn.Call() 方法来调用函数,通过 []refelct.Value{vdata.Index(i)}来获得数据。

Go语言中的反射的语法还是有点令人费解的,但是简单看一下手册还是能够读懂的。我这篇文章不讲反射,所以相关的基础知识还请大家自行Google相关的教程。
于是,我们就可以有下面的代码——不同类型的数据可以使用相同逻辑的Map()代码。

square := func(x int) int {
  return x * x
}
nums := []int{1, 2, 3, 4}

squared_arr := Map(nums,square)
fmt.Println(squared_arr)
//[1 4 9 16]

upcase := func(s string) string {
  return strings.ToUpper(s)
}
strs := []string{"Hao", "Chen", "MegaEase"}
upstrs := Map(strs, upcase);
fmt.Println(upstrs)
//[HAO CHEN MEGAEASE]

但是因为反射是运行时的事,所以,如果类型什么出问题的话,就会有运行时的错误。比如:

x := Map(5, 5)
fmt.Println(x)

上面的代码可以很轻松的编译通过,但是在运行时就出问题了,还是panic错误……

panic: reflect: call of reflect.Value.Len on int Value

goroutine 1 [running]:
reflect.Value.Len(0x10b5240, 0x10eeb58, 0x82, 0x10716bc)
        /usr/local/Cellar/go/1.15.3/libexec/src/reflect/value.go:1162 +0x185
main.Map(0x10b5240, 0x10eeb58, 0x10b5240, 0x10eeb60, 0x1, 0x14, 0x0)
        /Users/chenhao/.../map.go:12 +0x16b
main.main()
        /Users/chenhao/.../map.go:42 +0x465
exit status 2

健壮版的Generic Map

所以,如果要写一个健壮的程序,对于这种用interface{} 的“过度泛型”,就需要我们自己来做类型检查。下面是一个有类型检查的Map代码:

func Transform(slice, function interface{}) interface{} {
  return transform(slice, function, false)
}

func TransformInPlace(slice, function interface{}) interface{} {
  return transform(slice, function, true)
}

func transform(slice, function interface{}, inPlace bool) interface{} {
 
  //check the <code data-enlighter-language="raw" class="EnlighterJSRAW">slice</code> type is Slice
  sliceInType := reflect.ValueOf(slice)
  if sliceInType.Kind() != reflect.Slice {
    panic("transform: not slice")
  }

  //check the function signature
  fn := reflect.ValueOf(function)
  elemType := sliceInType.Type().Elem()
  if !verifyFuncSignature(fn, elemType, nil) {
    panic("trasform: function must be of type func(" + sliceInType.Type().Elem().String() + ") outputElemType")
  }

  sliceOutType := sliceInType
  if !inPlace {
    sliceOutType = reflect.MakeSlice(reflect.SliceOf(fn.Type().Out(0)), sliceInType.Len(), sliceInType.Len())
  }
  for i := 0; i < sliceInType.Len(); i++ {
    sliceOutType.Index(i).Set(fn.Call([]reflect.Value{sliceInType.Index(i)})[0])
  }
  return sliceOutType.Interface()

}

func verifyFuncSignature(fn reflect.Value, types ...reflect.Type) bool {

  //Check it is a funciton
  if fn.Kind() != reflect.Func {
    return false
  }
  // NumIn() - returns a function type's input parameter count.
  // NumOut() - returns a function type's output parameter count.
  if (fn.Type().NumIn() != len(types)-1) || (fn.Type().NumOut() != 1) {
    return false
  }
  // In() - returns the type of a function type's i'th input parameter.
  for i := 0; i < len(types)-1; i++ {
    if fn.Type().In(i) != types[i] {
      return false
    }
  }
  // Out() - returns the type of a function type's i'th output parameter.
  outType := types[len(types)-1]
  if outType != nil && fn.Type().Out(0) != outType {
    return false
  }
  return true
}

上面的代码一下子就复杂起来了,可见,复杂的代码都是在处理异常的地方。我不打算Walk through 所有的代码,别看代码多,但是还是可以读懂的,下面列几个代码中的要点:

  • 代码中没有使用Map函数,因为和数据结构和关键有含义冲突的问题,所以使用Transform,这个来源于 C++ STL库中的命名。
  • 有两个版本的函数,一个是返回一个全新的数组 – Transform(),一个是“就地完成” – TransformInPlace();
  • 在主函数中,用 Kind() 方法检查了数据类型是不是 Slice,函数类型是不是Func;
  • 检查函数的参数和返回类型是通过 verifyFuncSignature() 来完成的,其中:NumIn() – 用来检查函数的“入参”,NumOut() 用来检查函数的“返回值”;
  • 如果需要新生成一个Slice,会使用 reflect.MakeSlice() 来完成。
    好了,有了上面的这段代码,我们的代码就很可以很开心的使用了:

可以用于字符串数组:

list := []string{"1", "2", "3", "4", "5", "6"}
result := Transform(list, func(a string) string{
    return a +a +a
})
//{"111","222","333","444","555","666"}

可以用于整形数组:

list := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
TransformInPlace(list, func (a int) int {
  return a*3
})
//{3, 6, 9, 12, 15, 18, 21, 24, 27}

可以用于结构体:

var list = []Employee{
    {"Hao", 44, 0, 8000},
    {"Bob", 34, 10, 5000},
    {"Alice", 23, 5, 9000},
    {"Jack", 26, 0, 4000},
    {"Tom", 48, 9, 7500},
}

result := TransformInPlace(list, func(e Employee) Employee {
    e.Salary += 1000
    e.Age += 1
    return e
})

健壮版的 Generic Reduce
同样,泛型版的 Reduce 代码如下:

func Reduce(slice, pairFunc, zero interface{}) interface{} {
  sliceInType := reflect.ValueOf(slice)
  if sliceInType.Kind() != reflect.Slice {
    panic("reduce: wrong type, not slice")
  }

  len := sliceInType.Len()
  if len == 0 {
    return zero
  } else if len == 1 {
    return sliceInType.Index(0)
  }

  elemType := sliceInType.Type().Elem()
  fn := reflect.ValueOf(pairFunc)
  if !verifyFuncSignature(fn, elemType, elemType, elemType) {
    t := elemType.String()
    panic("reduce: function must be of type func(" + t + ", " + t + ") " + t)
  }

  var ins [2]reflect.Value
  ins[0] = sliceInType.Index(0)
  ins[1] = sliceInType.Index(1)
  out := fn.Call(ins[:])[0]

  for i := 2; i < len; i++ {
    ins[0] = out
    ins[1] = sliceInType.Index(i)
    out = fn.Call(ins[:])[0]
  }
  return out.Interface()
}

健壮版的 Generic Filter

同样,泛型版的 Filter 代码如下(同样分是否“就地计算”的两个版本):
func Filter(slice, function interface{}) interface{} {
  result, _ := filter(slice, function, false)
  return result
}

func FilterInPlace(slicePtr, function interface{}) {
  in := reflect.ValueOf(slicePtr)
  if in.Kind() != reflect.Ptr {
    panic("FilterInPlace: wrong type, " +
      "not a pointer to slice")
  }
  _, n := filter(in.Elem().Interface(), function, true)
  in.Elem().SetLen(n)
}

var boolType = reflect.ValueOf(true).Type()

func filter(slice, function interface{}, inPlace bool) (interface{}, int) {

  sliceInType := reflect.ValueOf(slice)
  if sliceInType.Kind() != reflect.Slice {
    panic("filter: wrong type, not a slice")
  }

  fn := reflect.ValueOf(function)
  elemType := sliceInType.Type().Elem()
  if !verifyFuncSignature(fn, elemType, boolType) {
    panic("filter: function must be of type func(" + elemType.String() + ") bool")
  }

  var which []int
  for i := 0; i < sliceInType.Len(); i++ {
    if fn.Call([]reflect.Value{sliceInType.Index(i)})[0].Bool() {
      which = append(which, i)
    }
  }

  out := sliceInType

  if !inPlace {
    out = reflect.MakeSlice(sliceInType.Type(), len(which), len(which))
  }
  for i := range which {
    out.Index(i).Set(sliceInType.Index(which[i]))
  }

  return out.Interface(), len(which)
}

后记

还有几个未尽事宜:

  1. 使用反射来做这些东西,会有一个问题,那就是代码的性能会很差。所以,上面的代码不能用于你需要高性能的地方。怎么解决这个问题,我们会在本系列文章的下一篇文章中讨论。
  2. 上面的代码大量的参考了 Rob Pike的版本,他的代码在 https://github.com/robpike/filter
  3. 其实,在全世界范围内,有大量的程序员都在问Go语言官方什么时候在标准库中支持 Map/Reduce,Rob Pike说,这种东西难写吗?还要我们官方来帮你们写么?这种代码我多少年前就写过了,但是,我从来一次都没有用过,我还是喜欢用“For循环”,我觉得你最好也跟我一起用“For循环”。

我个人觉得,Map/Reduce在数据处理的时候还是很有用的,Rob Pike可能平时也不怎么写“业务逻辑”的代码,所以,对他来说可能也不太了解业务的变化有多么的频繁……

当然,好还是不好,由你来判断,但多学一些编程模式是对自己的帮助也是很有帮助的。