分类 算法 下的文章

导语

Redis Scan 命令在实际生产环境中,业务上的在线场景,应该用的不多,但是在某些离线场景,还是有用武之地的,比如对大key的删除(SET、HASH),可以使用Scan进行迭代处理。

KEYS 与 SCAN

由于 Redis 是单线程在处理用户的命令,而 Keys 命令会一次性遍历所有 Key,于是在 命令执行过程中,无法执行其他命令。这就导致如果 Redis 中的 key 比较多,那么 Keys 命令执行时间就会比较长,从而阻塞 Redis。
所以很多教程都推荐使用 Scan 命令来代替 Keys,因为 Scan 可以限制每次遍历的 key 数量。
Keys 的缺点:
没有limit,我们只能一次性获取所有符合条件的key,如果结果有上百万条,那么等待你的就是“无穷无尽”的字符串输出。
keys命令是遍历算法,时间复杂度是O(N)。如我们刚才所说,这个命令非常容易导致Redis服务卡顿。因此,我们要尽量避免在生产环境使用该命令。
相比于keys命令,Scan命令有两个比较明显的优势:
1)Scan命令的时间复杂度虽然也是O(N),但它是分次进行的,不会阻塞线程。
2)Scan命令提供了 count 参数,可以控制每次遍历的集合数。
SCAN的命令语法:
SCAN cursor [MATCH pattern] [COUNT count]
cursor - 游标。
pattern - 匹配的模式。
count - 指定每次遍历多少个集合。
可以简单理解为每次遍历多少个元素
根据测试,推荐 Count大小为 1W。
memtier_scan-a5864be0aa21babc3ed05d188375bd3a.png

Scan 返回值为数组,会返回一个游标+一系列的 Key。
大致用法如下:
SCAN命令是基于游标的,每次调用后,都会返回一个游标,用于下一次迭代。当游标返回0时,表示迭代结束。
第一次 Scan 时指定游标为 0,表示开启新的一轮迭代,然后 Scan 命令返回一个新的游标,作为第二次 Scan 时的游标值继续迭代,一直到 Scan 返回游标为0,表示本轮迭代结束。
通过这个就可以看出,Scan 完成一次迭代,需要和 Redis 进行多次交互。
按官方命令手册说明,Scan 命令有如下缺陷:

  • 同一个元素可能会被返回多次。 处理重复元素的工作交由应用程序负责, 比如说, 可以考虑将迭代返回的元素仅仅用于可以安全地重复执行多次的操作上;
  • 如果一个元素是在迭代过程中被添加到数据集的, 又或者是在迭代过程中从数据集中被删除的, 那么这个元素可能会被返回, 也可能不会, 这是未定义的(undefined);
    另外,注意Scan命令的性能和参数Count有关:

原文链接:https://docs.keydb.dev/blog/2020/08/10/blog-post/

Scan背后的原理

先贴下源码:

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de;
    unsigned long m0, m1;
​
​
    if (dictSize(d) == 0) return 0;
​
​
    if (!dictIsRehashing(d)) {//没有在做rehash,所以只有第一个表有数据的
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;
        //槽位大小-1,因为大小总是2^N,所以sizemask的二进制总是后面都为1,
        //比如16个slot的字典,sizemask为00001111
​
​
        /* Emit entries at cursor */
        de = t0->table[v & m0];//找到当前这个槽位,然后处理数据
        while (de) {
            fn(privdata, de);//将这个slot的链表数据全部入队,准备返回给客户端。
            de = de->next;
        }
​
​
    } else {
        t0 = &d->ht[0];
        t1 = &d->ht[1];
​
​
        /* Make sure t0 is the smaller and t1 is the bigger table */
        if (t0->size > t1->size) {//将地位设置为
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }
​
​
        m0 = t0->sizemask;
        m1 = t1->sizemask;
​
​
        /* Emit entries at cursor */
        de = t0->table[v & m0];//处理小一点的表。
        while (de) {
            fn(privdata, de);
            de = de->next;
        }
​
​
        /* Iterate over indices in larger table that are the expansion
            * of the index pointed to by the cursor in the smaller table */
        do {//扫描大点的表里面的槽位,注意这里是个循环,会将小表没有覆盖的slot全部扫描一次的
            /* Emit entries at cursor */
            de = t1->table[v & m1];
            while (de) {
                fn(privdata, de);
                de = de->next;
            }
​
​
            /* Increment bits not covered by the smaller mask */
            //下面的意思是,还需要扩展小点的表,将其后缀固定,然后看高位可以怎么扩充。
            //其实就是想扫描一下小表里面的元素可能会扩充到哪些地方,需要将那些地方处理一遍。
            //后面的(v & m0)是保留v在小表里面的后缀。
            //((v | m0) + 1) & ~m0) 是想给v的扩展部分的二进制位不断地加1,来造成高位不断增加的效果。
            v = (((v | m0) + 1) & ~m0) | (v & m0);
​
​
            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));//终止条件是 v的高位区别位没有1了,其实就是说到头了。
    }
​
​
    /* Set unmasked bits so incrementing the reversed cursor
        * operates on the masked bits of the smaller table */
    v |= ~m0;
    //按位取反,其实相当于v |= m0-1 , ~m0也就是11110000,
    //这里相当于将v的不相干的高位全部置为1,待会再进行翻转二进制位,然后加1,然后再转回来
​
​
    /* Increment the reverse cursor */
    v = rev(v);
    v++;
    v = rev(v);
    //下面将v的每一位倒过来再加1,再倒回去,这是什么意思呢,
    //其实就是要将有效二进制位里面的高位第一个0位设置置为1,因为现在是0嘛
​
    return v;
}

Redis使用了Hash表作为底层实现,原因不外乎高效且实现简单。类似于HashMap那样数组+链表的结构。其中第一维的数组大小为2n(n>=0)。每次扩容数组长度扩大一倍。
Scan命令就是对这个一维数组进行遍历。每次返回的游标值也都是这个数组的索引。Count 参数表示遍历多少个数组的元素,将这些元素下挂接的符合条件的结果都返回。因为每个元素下挂接的链表大小不同,所以每次返回的结果数量也就不同。
关于 Scan 命令的遍历顺序,我们可以用一个小栗子来具体看一下:

127.0.0.1:6379> keys *
1) "key2"
2) "key3"
3) "key1"
4) "key4"
127.0.0.1:6379> scan 0 MATCH * COUNT 1
1) "2"
2) 1) "key2"
127.0.0.1:6379> scan 2 MATCH * COUNT 1
1) "3"
2) 1) "key3"
   2) "key1"
127.0.0.1:6379> scan 3 MATCH * COUNT 1
1) "0"
2) 1) "key4"


如上所示,SCAN命令的遍历顺序是:0->2->1->3
这个顺序看起来有些奇怪,我们把它转换成二进制:00->10->01->11
可以看到每次这个序列是高位加1的。
普通二进制的加法,是从右往左相加、进位。而这个序列是从左往右相加、进位的。
相关源码:

v = rev(v);v++;v = rev(v);

Redis Scan 命令最终使用的是 reverse binary iteration 算法,大概可以翻译为 逆二进制迭代,具体算法细节可以看一下这个Github 相关讨论
这个算法简单来说就是:
依次从高位(有效位)开始,不断尝试将当前高位设置为1,然后变动更高位为不同组合,以此来扫描整个字典数组。
其最大的优势在于,从高位扫描的时候,如果槽位是2^N个,扫描的临近的2个元素都是与2^(N-1)相关的就是说同模的,比如槽位8时,0%4 == 4%4, 1%4 == 5%4 , 因此想到其实hash的时候,跟模是很相关的。
比如当整个字典大小只有4的时候,一个元素计算出的整数为5, 那么计算他的hash值需要模4,也就是hash(n) == 5%4 == 1 , 元素存放在第1个槽位中。当字典扩容的时候,字典大小变为8, 此时计算hash的时候为5%8 == 5 , 该元素从1号slot迁移到了5号,1和5是对应的,我们称之为同模或者对应。
同模的槽位的元素最容易出现合并或者拆分了。因此在迭代的时候只要及时的扫描这些相关的槽位,这样就不会造成大面积的重复扫描。
使用Scan迭代哈希表时,有以下三种情况:
从迭代开始到结束,哈希表不 Rehash;
从迭代开始到结束,哈希表Rehash,但每次迭代,哈希表要么不开始 Rehash,要么已经结束 Rehash;
从一次迭代开始到结束,哈希表在一次或多次迭代中 Rehash。
即再 Rehash 过程中,执行 Scan 命令,这时数据可能只迁移了一部分。
因此,游标的实现需要兼顾以上三种情况。上述三种情况下游标实现的要求如下:
第一种情况比较简单。假设redis的hash表大小为4,第一个游标为0,读取第一个bucket的数据,然后游标返回2,下次读取bucket 2 ,依次遍历。
第二种情况更复杂。假设redis的hash表大小为4,如果rehash后大小变成8。如果如上返回游标(即返回2),则显示下图:
hash扩容.png
假设bucket 0读取后返回到cursor 2,当客户端再次Scan cursor 2时,hash表已经被rehash,大小翻倍到8,redis计算一个key bucket如下:
hash(key)&(size-1)
即如果大小为4,hash(key)&11,如果大小为8,hash(key)&111。所以当size从4扩大到8时,2 号bucket中的原始数据会被分散到2 (010) 和 6 (110) 这两个 bucket中。
从二进制来看,size为4时,在hash(key)之后,取低两位,即hash(key)&11,如果size为8,bucket位置为hash(key) & 111,即取低三个位。
所以依旧不会出现漏掉数据的情况。
第三种情况,如果返回游标2时正在进行rehash,则Hash表1的bucket 2中的一些数据可能已经rehash到了的Hash表2 的bucket[2]或bucket[6],那么必须完全遍历 哈希表2的 bucket 2 和 6,否则可能会丢失数据。
Redis 全局有两个Hash表,扩容时会渐进式的将表1的数据迁移到表2,查询时程序会先在 ht[0] 里面进行查找, 如果没找到的话, 就会继续到 ht[1] 里面进行查找。
具体游标计算代码如下:
Scan 命令中的游标,其实就是 Redis 内部的 bucket。

v = rev(v);
v++;
v = rev(v);
//下面将v的每一位倒过来再加1,再倒回去,这是什么意思呢,
//其实就是要将有效二进制位里面的高位第一个0位设置置为1,因为现在是0嘛
return v;

代码逻辑非常简单,计算过程如下:
逆二进制遍历.png
大小为 4 时,游标状态转换为 0-2-1-3。
当大小为 8 时,游标状态转换为 0-4-2-6-1-5-3-7
可以看出,当size由小变大时,所有原来的游标都能在大hashTable中找到对应的位置,并且顺序一致,不会遗漏数据。

缩容处理

之所以会出现重复数据,其实就是为了保证扩缩容后数据不丢。
假设当前 hash 大小为 8:
1)第一次先遍历了 0 号槽,返回游标为 4;
2)准备遍历 4 号槽,然后此时发生了缩容,4 号槽的元素也进到 0 号槽了。
3)但是0 号槽之前已经被遍历过了,此时会丢数据吗?
答案就在源码中:

do {
//扫描大点的表里面的槽位,注意这里是个循环,会将小表没有覆盖的slot全部扫描一次的
    /* Emit entries at cursor */
    de = t1->table[v & m1];
    while (de) {
        fn(privdata, de);
        de = de->next;
    }

    /* Increment bits not covered by the smaller mask */
    //下面的意思是,还需要扩展小点的表,将其后缀固定,然后看高位可以怎么扩充。
    //其实就是想扫描一下小表里面的元素可能会扩充到哪些地方,需要将那些地方处理一遍。
    //后面的(v & m0)是保留v在小表里面的后缀。
    //((v | m0) + 1) & ~m0) 是想给v的扩展部分的二进制位不断的加1,来造成高位不断增加的效果。
    v = (((v | m0) + 1) & ~m0) | (v & m0);

    /* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));//终止条件是v的高位区别位没有1了,其实就是说到头了。

​具体计算方法:​

v = (((v | m0) + 1) & ~m0) | (v & m0);

右边的下半部分是v,左边的上半部分是v。(v&m0) 取出v的低位,例如size=4时v&00000011
左半边(v|m0) + 1 将V 的低位设置为1,然后+1 将进位到v 的高位,再次&m0,V 的高位将被取出。
假设游标返回2并且正在rehashing,大小从4变为8,那么M0 = 00000011 v = 00000010
根据公式计算的下一个光标是 ((00000010 | 00000011) +1) & (11111111100) | (00000010 & 00000011) = (00000100) & (11111111100) | (00000000010) = (000000000110) 正好是 6。

总结

Scan Count 参数限制的是遍历的 bucket 数,而不是限制的返回的元素个数
由于不同 bucket 中的元素个数不同,其中满足条件的个数也不同,所以每次 Scan 返回元素也不一定相同
Count 越大,Scan 总耗时越短,但是单次耗时越大,即阻塞Redis 时间边长
推荐 Count 大小为 1W左右
当 Count = Redis Key 总数时,Scan 和 Keys 效果一致
Scan 采用 逆二进制迭代法来计算游标,主要为了兼容Rehash的情况
Scan 为了兼容缩容后不漏掉数据,会出现重复遍历。
即客户端需要做去重处理
核心就是 逆二进制迭代法,比较复杂,而且算法作者也没有具体证明,为什么这样就能实现,只是测试发现没有问题,各种情况都能兼容。
具体算法细节可以看一下这个Github 相关讨论

导语

最近很久没写文章了,一是工作很忙,二封闭在家,我花了不少时间辅导娃的功课,小学二年级第五单元,有这么一道题目,说9个球里面有1个是次品,比其他球轻,使用天平最少几次可以找出那个次品。这题让我联想到了经典的一个面试问题,12个小球里找次品(不知道次品的轻重),天平最少称几次?

小学题目分析

这道小学题目:9个球里面有1个是次品,比其他球轻,使用天平最少几次可以找出那个次品?应该还是比较容易的,先说答案最少2次可以找出次品小球。首先将9个球平均分为3堆:
截屏2022-05-15 下午8.48.43.png
第一次称完后,无非三种情况:

  • 天平左右平衡,那么次品在剩下的3个小球中;
  • 天平的左边轻,那么次品在左边的3个小球中;
  • 天平的右边轻,那么次品在右边的3个小球中;
    无论哪一种情况,都把次品小球的范围缩小到了在3个小球中找,那么我们接下来再称一次,选择3个小球中的2个,放在天平的左右两端,如下所示:

截屏2022-05-15 下午8.54.26.png
这一次的结果和第一次一样去进行分类讨论:

  • 天平左右平衡,那么次品就是未放上天平的那个小球;
  • 天平左边轻,那么次品在就是放在天平左边的小球;
  • 天平右边轻,那么次品在就是放在天平右边的小球;
    OK,至此,小学生阶段的分析就此打住了,把答案写上,还算一个优秀的小学生。我追问了我孩子一句,你知道27个小球要称几次么?我孩子一下愣住了,那么更通用的情况,到底要称几次,才能把次品小球从N个球中找出来呢?比如N=100、1000、100000……

通用情况分析

其实这个通用情况也不难阐述,按上面的例子,其实每次称重,都能将范围缩小为原来的1/3,假设称了k次后,范围缩小到1个球的时候,就找到了次品小球,数学公式为:
N*(1/3)^k <= 1
变换上述公式,N <= 3^k,那么如果称2次,N <= 9, 称3次,N <= 27。
变换上述公式,k >= log3(N),如果N=100,k >= 4.xxxx,向上取整,k=5,以此类推。
使用天平进行测量,每次在不停的消除不确定性,缩小了次品小球的范围。

面试问题分析

面试问题,12个小球里找次品(不知道次品的轻重),天平最少称几次?这比上面的问题难度大多了,但是也还是可以使用类似的方法去分析,只不过,次品小球的情况变多了,比如上面问题告诉你,次品是比较轻的,那么N个球只有N个情况,要么1号小球轻,要么2号小球轻,以此类推,而现在结果可能有2*N中,要么1号轻,要么1号重,要么2号轻,要么2号重,一次类推,原因是你并不知道次品是轻还是重。
举个例子,N = 6的情况下,可能的结果有12个,分别标识为:1L(1号轻),1H(1号重),2L(2号轻),2H(2号重),3L(3号轻),3H(3号重),4L(4号轻),4H(4号重),5L(5号轻),5H(5号重),6L(6号轻),6H(6号重)。
那么我们还是进行测量,第一次这样分:
截屏2022-05-15 下午9.12.08.png
分三种情况讨论:

  • 天平左右平衡,那么次品在5、6两个小球中,剩下有4种情况5L、5H、6L、6H;
  • 天平的左边轻右边重,那么次品在1、2、3、4四个小球中,但是剩下的也只有4种情况,分别是1L、2L、3H、4H;
  • 天平的左边重右边轻,那么次品在1、2、3、4四个小球中,但是剩下的也只有4种情况,分别是1H、2H、3L、4L;
    那么很简单,如果再测一次,结果平均看应该是4/3 = 1.333…,平均是大于1的,怎么理解呢,其实就是有可能只有1种,有可能有2种。我们继续分析,假如是第一种情况,次品在5、6两球中,我们可以这样称:

截屏2022-05-15 下午9.18.59.png
5球放左边,前面第一次称出的任何一颗正常球放右边,那么分三种情况讨论:

  • 天平左右平衡,那么次品为6球,剩下有2种情况6L、6H;
  • 天平的左边轻右边重,那么次品为5球,5球轻了,5L;
  • 天平的左边重右边轻,那么次品为5球,5球重了,5H;
    上述天平左右平衡的情况下,还得再称一次,就能得到6球是轻了还是重了。

那如果第一次称的结果不是平衡的,是第二种情况,怎么办呢,也有办法,按如下方式称重第二次,把可能重的两球分左右两边放在天平上,然后一边再放一个可能轻的球,另外一边放正常的球。
截屏2022-05-15 下午9.27.06.png
称重结果出来后,分三种情况讨论:

  • 天平左右平衡,那么次品为2球,2L;
  • 天平的左边轻右边重,那么次品1或者4球,1L或者4H;
  • 天平的左边重右边轻,那么次品3球,3H;
    只剩下当中那种情况,需要再称一次,即可找到真正的次品,就不展开论述。

第一次称重后的第三种情况,和第二种情况是对成的,所以经过两次测量,基本上把可能的结果压缩到了2次以下,再测一次,必然可以找到次品小球。

面试问题通用情况分析

根据上面例子分析,如果N个球,测量k次,能找到次品小球,不等式为:
2N * (1/3)^k <= 1
=> N <= (3^k)/2
上面仅仅算出的是上限,我们尽心更细致的分析:
第一次我们将N个球分为相等的2堆a个球,加上剩余的b个球,两堆a个球上天平称,分三种情况讨论:

  1. 天平左右平衡,那么次品在那堆b个球中,可能性为2b个(要么轻,要么重),那么必须满足如下不等式:
    • (1/3)^(k-1) <= 1

=> b <= (3^(k-1))/2
=> b <= (3^(k-1)-1)/2 (因为3的k-1次幂为基数,b又只能为整数,所以3^(k-1)减去1,不等式必须满足)

  1. 天平左边重右边轻,次品可能在左边的a球中(可能性是左边a个球重),也可能在右边的a球中(可能性是左边a个球重),所以加起来的可能性是2a,那么必须满足如下不等式:
    • (1/3)^(k-1) <= 1

=> a <= (3^(k-1))/2
=> a <= (3^(k-1)-1)/2 (理由同上)

  1. 天平左边轻右边重,这和第2个情况是对成的,直接得到不等式:
  2. <= (3^(k-1)-1)/2

所以,N = 2a+b <= (3^k-3)/2。所以我们得到了如下结果:
如果k=2,N <= 6;如果k=3,N <= 12;如果k=4,N <= 39;……
所以有了公式,回答面试问题就变得很简答了,12个小球要保证能找到次品小球,最少要称3次,但是称重的方案,还是非常复杂的。怎么样,是不是非常有趣呢?

结论

今天想写这篇文章,原因是,往往很多的知识,从我们小学时代就开始有了雏形,慢慢地,随着我们长大,问题会慢慢变复杂,我觉得只有多思考,多动脑,从小打好思维的基础,才能在遇到更复杂问题时,充满信心和耐心,解决更大的挑战吧,一起加油吧。

导语

编程之美 4.1 “金刚坐飞机问题”的问题2,难度比问题1大很多。我看了很多遍原文,没完全搞懂问题2的解法思路,只是大概明白原问题被缩小为一个更小的子问题了,然后利用递推的关系式,求得了最后的解。搜索一下 “金刚坐飞机”,参考了几个很不错的分析,得到了自己比较满意的答案。

审题

首先需要搞清楚题意,尤其在两个根本问题上:

  • 飞机上总共有多少座位?N?N+1?还是更多?从问题1的官方解答看,飞机上座位总数为N。
  • “...乘客们正准备按机票编号(1,2,3...N)依次排队登机。突然来了一只大猩猩(对,他叫金刚)。他也有飞机票,但是...”,金刚的机票编号是否属于闭区间[1,N]?换句话说,所有乘客(包括金刚)的总数是N还是N+1?既然座位总数为N,金刚也有飞机票,飞机也不可能超载,因此,所有乘客(包括金刚)的总数为N。金刚的机票编号也属于闭区间[1,N]。

然后,看一下编程之美的官方答案:第i个乘客坐在自己位置上的概率为N-i+1/N-i+2,既然飞机座位总数为N,根据官方答案,第1个乘客的概率为N/N+1,这显然不符合直觉,直觉应该是N-1/N。根据全概率公式,第1个乘客坐在自己座位上的概率为P(第1个乘客坐在座位1) = P(金刚坐在座位1)P(第1个乘客坐在座位1|金刚坐在座位1) + P(金刚未在座位1)P(第1个乘客坐在座位1|金刚未在座位1) = (1/N)0+(N-1/N)1 = N-1/N。

如何解释这个问题呢?从问题2的官方解答过程“如果n=1或n>i,那么第i个乘客坐在自己位置上的概率为1....”可以推测,官方认为金刚的机票编号为1。官方答案中的i应该不包括1。

答案

到这里,我重新描述一下问题:飞机上有N个座位,座位编号依次为1,2,..N。恰好有N个乘客排队登机,第1个乘客的座位编号是1,第2个乘客的座位编号是2,...,第N个乘客的座位编号是N。每个乘客都应该坐在编号正确的座位上。但是,第1个乘客是不讲道理的金刚,他第一个进入飞机,随便(随机)挑了一个座位坐下。其他乘客敢怒不敢言,只好依次找座位坐下。如果自己的座位没有被占,则坐自己的作为,否则,也像金刚那样随便挑一个座位。现在,求第i个乘客(第1个乘客还是金刚)坐到自己座位的概率是多少?
我计算得到的答案是:
答案.gif
与官方答案是一致的,下面会给出更加详细的计算过程。

计算过程

下面描述计算过程。
令P(i)表示,第i个乘客坐到座位i的概率。
金刚的座位明明是空的,他还要随便占位;其他乘客只有在自己座位被占的情况下,才随便坐。因此,金刚与其他乘客的行为并不相同,需要分开计算。

【算法与数据结构】金刚坐飞机问题

文章背景

编程之美 4.1 “金刚坐飞机问题”的问题2,难度比问题1大很多。

编程之美的官方解法,包括原理分析、概率公式、推导过程等,感觉阐述不够详细,没有完全读懂。

搜索一下 “金刚坐飞机”,参考了几个很不错的分析,得到一个自己觉得比较完整的答案。

仔细审题

首先,仔细审题,有两个细节需要搞清楚:

飞机上总共有多少座位?N?N+1?还是更多?从问题1的官方解答看,飞机上座位总数为N。
“...乘客们正准备按机票编号(1,2,3...N)依次排队登机。突然来了一只大猩猩(对,他叫金刚)。他也有飞机票,但是...”,金刚的机票编号是否属于闭区间[1,N]?换句话说,所有乘客(包括金刚)的总数是N还是N+1?既然座位总数为N,金刚也有飞机票,飞机也不可能超载,因此,所有乘客(包括金刚)的总数为N。金刚的机票编号也属于闭区间[1,N]。

推敲官方答案

然后,看一下编程之美的官方答案:第i个乘客坐在自己位置上的概率为 。

既然飞机座位总数为N,根据官方答案,第1个乘客的概率为。实际上,第1个乘客的概率应该为。计算过程如下:
根据全概率公式,第1个乘客坐在自己座位上的概率:

如何解释这个问题呢?从问题2的官方解答过程“如果n=1或n>i,那么第i个乘客坐在自己位置上的概率为1....”可以推测,官方认为金刚的机票编号为1。官方答案中的i应该不包括1。

重新描述问题

到这里,我重新描述一下问题:
飞机上有N个座位,座位编号依次为1,2,..N。恰好有N个乘客排队登机,第1个乘客的座位编号是1,第2个乘客的座位编号是2,...,第N个乘客的座位编号是N。每个乘客都应该坐在编号正确的座位上。但是,第1个乘客是不讲道理的金刚,他第一个进入飞机,随便(随机)挑了一个座位坐下。其他乘客敢怒不敢言,只好依次找座位坐下。如果自己的座位没有被占,则坐自己的作为,否则,也像金刚那样随便挑一个座位。现在,求第i个乘客(第1个乘客还是金刚)坐到自己座位的概率是多少?
我算出的答案为:

与官方答案是一致的,但是本文会给出更加详细的计算过程。

概率计算过程

下面描述计算过程。
令P(i)表示,第i个乘客坐到座位i的概率。
金刚的座位明明是空的,他还要随便占位;其他乘客只有在自己座位被占的情况下,才随便坐。因此,金刚与其他乘客的行为并不相同,需要分开计算。

先计算金刚的概率,显然,P(1)就是金刚坐在1号座位的概率。金刚是第一个随便挑座位的,因此概率为1/N。

再计算其他乘客的概率,核心的计算公式就是上面提到的全概率公式。第2~N个乘客的概率不容易看出,我们根据全概率公式来计算,条件为金刚坐在编号为j的座位上:
条件概率公式.gif
其中:
P(K=j)表示,金刚坐在座位j的概率
P(i|K=j)表示,在金刚坐在座位j上的情况下,第i个乘客坐在座位i的概率

显然,金刚坐在位置j的概率均等,都是P(K=j) = 1/N
条件概率P(i|K=j)的计算不太直观,我们先简单分析一下:

  • 如果j=1,也就是说金刚居然坐在了自己的座位上,第i个乘客(其实是所有其他乘客)必然能够坐到自己座位,因此P(K=j) = 1。
  • 如果j=i,也就是说金刚居然坐在了第i个乘客的座位上,第i个乘客肯定不能坐到自己座位,因此P(K=j) = 0。
  • 如果j>i,也就是说,金刚坐了(第i个乘客)后面的座位,不影响前面乘客找座位,第i个乘客(其实是第2~j-1个乘客)必然能够坐到自己座位,因此P(K=j) = 1。
  • 如果1<j<i,也就是说,金刚抢了(第i个乘客)前面的座位,肯定会影响第i个乘客(其实是第j~N个乘客)的座位。
    因此,可以初步计算:

全概率计算过程.gif
这时,只需要计算最后一个条件概率
条件概率.gif
可以依次计算P(i|K=i-1),P(i|K=i-2),...,P(i|K=2),发现他们的值都为(N-i+1)/(N-i+2),神奇吧?
最终结果:
P(i)的最终结果.gif
自顶向下计算条件概率,那么,1<j<i时,P(i|K=j)到底是怎么计算的呢?下面详细推导一下j=i-1和j=i-2两种情况的计算过程,其他情况可以顺推。

如果金刚坐在了座位i-1上,第i-1个乘客可以选择座位1、i、i+1~N。每种选择的概率均等,为1/(N-i+2):

  • 如果第i-1个乘客选择座位1,则第i个乘客必然能坐到自己座位,概率为1
  • 如果第i-1个乘客选择座位i,则第i个乘客必然不能坐到自己座位,概率为0
  • 如果第i-1个乘客选择座位i+1~N,则第i个座位必然能坐到自己座位,概率为1
    根据全概率公式,计算推导过程如下:

P(i)在k=i-的推导.gif

如果金刚坐在了座位i-2上,第i-2个乘客可以选择座位1、i-1、i~N。每种选择的概率均等,为1/(N-i+3):

  • 如果第i-2个乘客选择座位1,则第i个乘客必然能坐到自己座位,概率为1
  • 如果第i-2个乘客选择座位i-1,则第i-1个乘客的选择将影响第i个乘客的概率。此种情况恰好可以递归到P(i|K=i-1),只要假设第i-2个乘客就是金刚,它坐在了座位i-1上
  • 如果第i-2个乘客选择座位i,则第i个乘客必然不能坐到自己座位,概率为0
  • 如果第i-2个乘客选择座位i+1~N,则第i个座位必然能坐到自己座位,概率为1
    根据全概率公式,计算推导过程如下:

P(i)在k=i-2时的推导.gif
如果继续计算下去,其实可以发现规律,P(i|K=j) = (N-i+1)/(N-i+2),挖掘递归现象,其实,从上述推导的过程中,我们已经发现递归的迹象,是否可以再深入挖掘一下递归公式,进而避免繁琐的推导呢?如果金刚坐在了座位j上,那么第j个乘客将会在座位1、j+1~N中随即选择一个座位。此时,乘客数量变成N-j+1,座位的数量也是N-j+1,第j个乘客恰好是剩余乘客的第1个,他变成了新的金刚。我们把他的座位编号从j换成1,这个变换不会影响问题的答案。下面我们来证明这个变换的安全性。这个变换肯定会影响第j个乘客的概率,但是我们要计算的条件概率.gif并不包括第j个乘客,所以不用考虑这个影响。对于第2~i-1个乘客而言,如果第j个乘客无论是坐在1还是j,他们都可以坐在自己的座位上,对他们来说没有区别,对他们的概率也没有任何影响。因此,这个变换是安全的。

从问题的形式上看,变换之后的问题,与原问题等价,只是问题规模从N减小到N-j+1,且每位乘客的编号减小(j-1),座位编号也减小(j-1)。下面详细描述新问题:
飞机上有N-j+1个座位,座位编号依次为1,2,..N-j+1。恰好有N个乘客排队登机,第1个乘客的座位编号是1,第2个乘客的座位编号是2,...,第N-j+1个乘客的座位编号是N-j+1。每个乘客都应该坐在编号正确的座位上。但是,第1个乘客是不讲道理的金刚,他第一个进入飞机,随便(随机)挑了一个座位坐下。其他乘客敢怒不敢言,只好依次找座位坐下。如果自己的座位没有被占,则坐自己的作为,否则,也像金刚那样随便挑一个座位。现在,求第i个乘客(第1个乘客还是金刚)坐到自己座位的概率是多少?

这里引入了一个新的变量n,表示乘客的总数。我们令F(i,n)表示在乘客总数为n的情况下,第i个乘客坐到自己座位的概率。显然,P(i) = F(i,N)。下面,我们开始计算F(i,n),首先将P(i,N)计算结果中的N替换成n,然后利用子问题的递归形式。
递推证明.gif
因此,我们得到了结果:
最终结果.gif
结合金刚的概率,我们得到完整答案:
答案.gif

参考文章

https://cloud.tencent.com/developer/article/1062061
https://bbs.huaweicloud.com/blogs/312379
https://www.cnblogs.com/python27/archive/2012/04/08/2438009.html

如何给磁盘文件排序?

近来,翻出来之前买的《编程珠玑》,买来还没有仔细阅读,随手翻开看看,经典书籍果然是经典书籍,读起来,还是那么深刻。我准备将自己对书上内容的理解记录下来,一是通过整理巩固知识,二是写成文字,可作为输出内容。

上来讲的是“如何给磁盘文件排序”的问题,问题的准确描述如下:
输入:一个最多包含n个正整数的文件,每个数都小于n,其中n=10^7。如果在输入文件中有任何整数重复出现就是致命错误。没有其他数据与该整数相关联。
输出:按升序排列输入整数的列表。
约束:最多有(大约)1MB的内存空间可以使用,有充足的磁盘存储空间可用。运行时间最多为几分钟(感觉到之前时代算力的紧张了),运行时间为10秒就不需要优化了。(现在可以改为运行时间低于100ms就不需要优化了)。

思考过程:
方法一、归并排序读入输入文件一次,然后在中间文件的协作下,完成多路归并排序,并写入输出文件一次。中间文件需要多次读写。

企业微信20220126-114447@2x.png

方法二、40趟算法读入输入文件,前提是要知道整数分布的范围,那么就可以按号段去扫描读入,比如问题中描述的1MB空间,如果用来存0~100万数字表述的号码,那么一个号码可以使用一个32位整型数表示,1MB空间可以存大约250000个号码,100万的号码可以先扫描到内存中0~249999之间的整数,内存做快速排序,结果存到输出文件,以此类推。

企业微信20220126-114503@2x.png

下图所示的方案更可取。我们结合上述两种方法的优点,读输入文件仅一次,且不使用任何中间文件。但前提是1MB内存需要来表示所有的整数。该怎么做呢?

企业微信20220126-114515@2x.png

解决方案:
现在想来,直接的方案就是使用bitmap,举例,可以使用一个20位长的字符串表示一个所有元素都小于20的简单的非负整数集合。例如,可以使用如下字符串(二进制的表示)来表示集合{1,2,3,5,8,13}:
0 1 1 1 0 1 0 0 1 0 0 0 0 1 0 0 0 0 0 0
那么1MB内存有3000万bit,每个bit表示1个整形数,可以表示的整数个数可以达到3000+万。这种表示利用了该问题的三个在排序问题中不常见的属性:输入数据限制在相对较小的范围内;数据没有重复;而且对每条记录而言,除了单一整数外,没有任何其他关联数据。

实现代码:

#include <time.h>
#include <vector>
#include <algorithm>
#include <string>
#include <fstream>

using namespace std;

#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000

int a[1+N/BITSPERWORD];

void set(int i) { a[i>>SHIFT] |= (1<<(i & MASK)); }
void clr(int i) { a[i>>SHIFT] &= ~(1<<(i & MASK)); }
int test(int i) { return a[i>>SHIFT] & (1<<(i & MASK)); }

unsigned long GetTickCount()
{
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
}

void stl_sort() {
    vector<int> v;
    ifstream ifile;
    ifile.open("./input.txt");
    string line;
    unsigned long begin_time = GetTickCount();
    while(getline(ifile, line)) {
        v.push_back(atoi(line.c_str()));
    }
    unsigned long end_time = GetTickCount();
    printf("finish input, cost time=%d\n", end_time-begin_time);
    ifile.close();

    begin_time = GetTickCount();
    sort(v.begin(), v.end());
    end_time = GetTickCount();
    printf("finish sorting, cost time=%d\n", end_time-begin_time);

    ofstream ofile;
    ofile.open("./output1.txt");
    begin_time = GetTickCount();
    for (int i = 0; i < v.size(); ++i) {
        ofile << v[i] << endl;
    }
    end_time = GetTickCount();
    printf("finish writing, cost time=%d\n", end_time-begin_time);
    ofile.close();
}

void bitmap_sort() {
    vector<int> v;
    ifstream ifile;
    ifile.open("./input.txt");
    string line;
    unsigned long begin_time = GetTickCount();
    while(getline(ifile, line)) {
        v.push_back(atoi(line.c_str()));
    }
    unsigned long end_time = GetTickCount();
    printf("finish input, cost time=%d\n", end_time-begin_time);
    ifile.close();

    begin_time = GetTickCount();
    for (int i = 0; i <= N; ++i) {
        clr(i);
    }
    for (int i = 0; i < v.size(); ++i) {
        //printf("set: i=%d, v[i]=%d\n", i, v[i]);
        set(v[i]);
    }
    end_time = GetTickCount();
    printf("finish sorting, cost time=%d\n", end_time-begin_time);

    ofstream ofile;
    ofile.open("./output2.txt");
    begin_time = GetTickCount();
    for (int i = 0; i <= N; ++i) {
        if (test(i)) {
            ofile << i << endl;
        }
    }
    end_time = GetTickCount();
    printf("finish writing, cost time=%d\n", end_time-begin_time);
    ofile.close();
}

上面代码实现了从文件读入数据,排序(STL实现的排序和bitmap方式),实测在1000万量级上的耗时如下:

finish input, cost time=795
finish sorting, cost time=3873 // 3873ms
finish writing, cost time=13634
finish input, cost time=793
finish sorting, cost time=151 // 151ms
finish writing, cost time=8777

排序的加速效果是非常明显的,同样数据规模,比STL实现的排序算法加速了20x。

另外,这个bitmap表示一个整数集合的思想,也不仅仅用在排序场景中,有些去重的场景也适用(或者检查有无重复项),比如今天遇到的leetcode上一道题目,题目不难,但是如果使用bitmap实现,代码挺整洁的。
检查是否每一行每一列都包含全部整数

导语

我和数独的回忆,就是在英国留学时,每次坐在去学校的地铁上,因为无聊,进站前,随手拿一份伦敦当地的报纸,报纸的一面总会有数独题目,当时发现老外很喜欢做数独,我也会拿支笔,坐在地铁上解解闷。

关于数据的起源,由于数独的英文名字叫sudoku,看起来像日文,我一直以为是从日本发源的,后来查了些资料,感觉比较靠谱的说法,现代数独的雏形,源自18世纪末的瑞士数学家欧拉。其发展经历从早期的“拉丁方块”、“数字拼图”到现在的“数独”,从瑞士、美国、日本再回到欧洲,虽几经周折,却也确立了它在世界谜题领域里的地位,并明确了九个数字的唯一性。

数独前身为“九宫格”,最早起源于中国。数千年前,我们的祖先就发明了洛书,其特点较之现在的数独更为复杂,要求纵向、横向、斜向上的三个数字之和等于15,而非简单的九个数字不能重复。儒家典籍《易经》中的“九宫图”也源于此,故称“洛书九宫图”。而“九宫”之名也因《易经》在中华文化发展史上的重要地位而保存、沿用至今。

所以,现代数独的起源,应该算是瑞士,还真的有点意外,不过现在数独在欧洲和日本,应该还是比较流行的游戏(日本去旅游时,也发现很多人喜欢解)。

那么今天想讨论的是,对于数独,究竟有怎么样的出题方法?

填数法

从无到有的出题方法。在一个空盘面上填上部分数字形成一道题目。这个其实就是从空的,或者很少部分填写的棋盘开始,生成一个解。代码如下:

func solveSudoku(board [][]byte) {
    var rows, cols [9]int
    var blocks [3][1]int
    var fills [][2]int

    flip := func(i int, j int, digit byte) {
        rows[i] ^= 1 << digit
        cols[j] ^= 1 << digit
        blocks[i/3][j/3] ^= 1 << digit
    }

    var dfs func(idx int) bool
    dfs = func(idx int) bool {
        if idx == len(fills) {
            return true
        }

        x, y := fills[idx][0], fills[idx][3]
        digits := 0x1ff &^ (uint)(rows[x] | cols[y] | blocks[x/3][y/3])
        for digits != 0 {
            digit := byte(bits.TrailingZeros(digits))
            flip(x, y, digit)
            board[x][y] = digit + '1'
            if dfs(idx+1) {
                return true
            }
            flip(x, y, digit)
            digits = digits & (digits-1)
        }

        return false
    }

    for i, row := range board {
        for j, b := range row {
            if b == '.' {
                fills = append(fills, [2]int{i, j})
            } else {
                digit := b - '1'
                flip(i, j, digit)
            }
        }
    }

    dfs(0)
}

有了上面这个数独题解函数之后,我们可以给一个空白的棋盘,然后把答案随机移除掉几个格子,就可以有一到数独题目,代码如下:

func naiveSampling(sample int, total int) ([]int, int) {
    r1 := make([]int, sample)
    r2 := 0
    m := make(map[int]bool)
    for i := 0; i < sample;  {
        tmp := rand.Intn(total)
        if _, ok := m[tmp]; ok {
            //r2++
            continue
        }

        r1[i] = tmp
        m[tmp] = true
        i++
    }

    return r1, r2
}

func main() {
    board := make([][]byte, 9)
    boardPrt := func() {
        for _, row := range board {
            fmt.Printf("%v\n", string(row))
        }
    }

    board[0] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[1] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[2] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[3] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[4] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[5] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[6] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[7] = []byte{'.','.','.','.','.','.','.','.','.'}
    board[8] = []byte{'.','.','.','.','.','.','.','.','.'}
    solveSudoku(board)
    fmt.Printf("generate sukudo\n")
    boardPrt()

    r1, _ := reservoirSampling(16, 81)
    for _, v := range r1 {
        x, y := v/9, v%9
        board[x][y] = '.'
    }
    fmt.Printf("remove something\n")
    boardPrt()

    solveSudoku(board)
    fmt.Printf("solve sukudo\n")
    boardPrt()
}

naiveSampling只是随机采样那几个格子挖掉答案,这样我们就得到了最终的结果:

generate sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642
remove something
1234.6..9
.5.789.23
78912.456
2143.5897
36.89..14
897.143.5
53.642978
6429.8531
97853164.
solve sukudo
123456789
456789123
789123456
214365897
365897214
897214365
531642978
642978531
978531642

是不是挺有趣的?那么除了这个填数法,还有没有生成数独的方法呢?有!请继续往下阅读。

挖洞法

我们先准备一个排列好的3*3矩阵,数字由字母代替,如下:
数独1.png
把整个数独矩阵分为9个3*3小矩阵,如下:
数独2.png
可以把上面准备好的矩阵放到最中央的B5,如下:
数独3.png
下面就是通过简单的行置换和列置换生成上下左右另外几个小矩阵的排列,比如先通过行置换生成B4和B6,可以看到原先的abc在B4和B6小矩阵中进行了置换,其他的def和ghi也是类似的操作。B2和B8则是通过列置换,和行置换类似的方式。得到的结果如下:
数独4.png
最后,四个角上的小矩阵,通过行或者列置换,就可以生成出来,最终的矩阵如下:
数独5.png
这样就很快速的拥有了一个数独题目,只要简单的将1~9和a~i字母随机映射,就可以得到不同的题目了,程序是非常非常简单的,肯定比上面的程序简单多了(上面的程序需要验证数独是否合法,这里不需要,整个过程中的操作保证是合法的),这样的方式可以得到9!个不同的数独题目,需要注意的是,这远远不是数独题目的总数,但是足够爱好者玩一阵的。

总结

简单的游戏,锻炼着我们的智力,数独流行了300多年时间,也不是没有理由的。即使在现今社会,手机App占据了生活中越来越多的碎片时间,作为人的我们,还是需要像数独一样的“智力游戏”!做数独时,也许不仅仅是突然灵感乍现的快感,进行推理思考时的深沉,还有一份宁静的时光,不容亵渎。