分类 中间件 下的文章

导语

Redis Scan 命令在实际生产环境中,业务上的在线场景,应该用的不多,但是在某些离线场景,还是有用武之地的,比如对大key的删除(SET、HASH),可以使用Scan进行迭代处理。

KEYS 与 SCAN

由于 Redis 是单线程在处理用户的命令,而 Keys 命令会一次性遍历所有 Key,于是在 命令执行过程中,无法执行其他命令。这就导致如果 Redis 中的 key 比较多,那么 Keys 命令执行时间就会比较长,从而阻塞 Redis。
所以很多教程都推荐使用 Scan 命令来代替 Keys,因为 Scan 可以限制每次遍历的 key 数量。
Keys 的缺点:
没有limit,我们只能一次性获取所有符合条件的key,如果结果有上百万条,那么等待你的就是“无穷无尽”的字符串输出。
keys命令是遍历算法,时间复杂度是O(N)。如我们刚才所说,这个命令非常容易导致Redis服务卡顿。因此,我们要尽量避免在生产环境使用该命令。
相比于keys命令,Scan命令有两个比较明显的优势:
1)Scan命令的时间复杂度虽然也是O(N),但它是分次进行的,不会阻塞线程。
2)Scan命令提供了 count 参数,可以控制每次遍历的集合数。
SCAN的命令语法:
SCAN cursor [MATCH pattern] [COUNT count]
cursor - 游标。
pattern - 匹配的模式。
count - 指定每次遍历多少个集合。
可以简单理解为每次遍历多少个元素
根据测试,推荐 Count大小为 1W。
memtier_scan-a5864be0aa21babc3ed05d188375bd3a.png

Scan 返回值为数组,会返回一个游标+一系列的 Key。
大致用法如下:
SCAN命令是基于游标的,每次调用后,都会返回一个游标,用于下一次迭代。当游标返回0时,表示迭代结束。
第一次 Scan 时指定游标为 0,表示开启新的一轮迭代,然后 Scan 命令返回一个新的游标,作为第二次 Scan 时的游标值继续迭代,一直到 Scan 返回游标为0,表示本轮迭代结束。
通过这个就可以看出,Scan 完成一次迭代,需要和 Redis 进行多次交互。
按官方命令手册说明,Scan 命令有如下缺陷:

  • 同一个元素可能会被返回多次。 处理重复元素的工作交由应用程序负责, 比如说, 可以考虑将迭代返回的元素仅仅用于可以安全地重复执行多次的操作上;
  • 如果一个元素是在迭代过程中被添加到数据集的, 又或者是在迭代过程中从数据集中被删除的, 那么这个元素可能会被返回, 也可能不会, 这是未定义的(undefined);
    另外,注意Scan命令的性能和参数Count有关:

原文链接:https://docs.keydb.dev/blog/2020/08/10/blog-post/

Scan背后的原理

先贴下源码:

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de;
    unsigned long m0, m1;
​
​
    if (dictSize(d) == 0) return 0;
​
​
    if (!dictIsRehashing(d)) {//没有在做rehash,所以只有第一个表有数据的
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;
        //槽位大小-1,因为大小总是2^N,所以sizemask的二进制总是后面都为1,
        //比如16个slot的字典,sizemask为00001111
​
​
        /* Emit entries at cursor */
        de = t0->table[v & m0];//找到当前这个槽位,然后处理数据
        while (de) {
            fn(privdata, de);//将这个slot的链表数据全部入队,准备返回给客户端。
            de = de->next;
        }
​
​
    } else {
        t0 = &d->ht[0];
        t1 = &d->ht[1];
​
​
        /* Make sure t0 is the smaller and t1 is the bigger table */
        if (t0->size > t1->size) {//将地位设置为
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }
​
​
        m0 = t0->sizemask;
        m1 = t1->sizemask;
​
​
        /* Emit entries at cursor */
        de = t0->table[v & m0];//处理小一点的表。
        while (de) {
            fn(privdata, de);
            de = de->next;
        }
​
​
        /* Iterate over indices in larger table that are the expansion
            * of the index pointed to by the cursor in the smaller table */
        do {//扫描大点的表里面的槽位,注意这里是个循环,会将小表没有覆盖的slot全部扫描一次的
            /* Emit entries at cursor */
            de = t1->table[v & m1];
            while (de) {
                fn(privdata, de);
                de = de->next;
            }
​
​
            /* Increment bits not covered by the smaller mask */
            //下面的意思是,还需要扩展小点的表,将其后缀固定,然后看高位可以怎么扩充。
            //其实就是想扫描一下小表里面的元素可能会扩充到哪些地方,需要将那些地方处理一遍。
            //后面的(v & m0)是保留v在小表里面的后缀。
            //((v | m0) + 1) & ~m0) 是想给v的扩展部分的二进制位不断地加1,来造成高位不断增加的效果。
            v = (((v | m0) + 1) & ~m0) | (v & m0);
​
​
            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));//终止条件是 v的高位区别位没有1了,其实就是说到头了。
    }
​
​
    /* Set unmasked bits so incrementing the reversed cursor
        * operates on the masked bits of the smaller table */
    v |= ~m0;
    //按位取反,其实相当于v |= m0-1 , ~m0也就是11110000,
    //这里相当于将v的不相干的高位全部置为1,待会再进行翻转二进制位,然后加1,然后再转回来
​
​
    /* Increment the reverse cursor */
    v = rev(v);
    v++;
    v = rev(v);
    //下面将v的每一位倒过来再加1,再倒回去,这是什么意思呢,
    //其实就是要将有效二进制位里面的高位第一个0位设置置为1,因为现在是0嘛
​
    return v;
}

Redis使用了Hash表作为底层实现,原因不外乎高效且实现简单。类似于HashMap那样数组+链表的结构。其中第一维的数组大小为2n(n>=0)。每次扩容数组长度扩大一倍。
Scan命令就是对这个一维数组进行遍历。每次返回的游标值也都是这个数组的索引。Count 参数表示遍历多少个数组的元素,将这些元素下挂接的符合条件的结果都返回。因为每个元素下挂接的链表大小不同,所以每次返回的结果数量也就不同。
关于 Scan 命令的遍历顺序,我们可以用一个小栗子来具体看一下:

127.0.0.1:6379> keys *
1) "key2"
2) "key3"
3) "key1"
4) "key4"
127.0.0.1:6379> scan 0 MATCH * COUNT 1
1) "2"
2) 1) "key2"
127.0.0.1:6379> scan 2 MATCH * COUNT 1
1) "3"
2) 1) "key3"
   2) "key1"
127.0.0.1:6379> scan 3 MATCH * COUNT 1
1) "0"
2) 1) "key4"


如上所示,SCAN命令的遍历顺序是:0->2->1->3
这个顺序看起来有些奇怪,我们把它转换成二进制:00->10->01->11
可以看到每次这个序列是高位加1的。
普通二进制的加法,是从右往左相加、进位。而这个序列是从左往右相加、进位的。
相关源码:

v = rev(v);v++;v = rev(v);

Redis Scan 命令最终使用的是 reverse binary iteration 算法,大概可以翻译为 逆二进制迭代,具体算法细节可以看一下这个Github 相关讨论
这个算法简单来说就是:
依次从高位(有效位)开始,不断尝试将当前高位设置为1,然后变动更高位为不同组合,以此来扫描整个字典数组。
其最大的优势在于,从高位扫描的时候,如果槽位是2^N个,扫描的临近的2个元素都是与2^(N-1)相关的就是说同模的,比如槽位8时,0%4 == 4%4, 1%4 == 5%4 , 因此想到其实hash的时候,跟模是很相关的。
比如当整个字典大小只有4的时候,一个元素计算出的整数为5, 那么计算他的hash值需要模4,也就是hash(n) == 5%4 == 1 , 元素存放在第1个槽位中。当字典扩容的时候,字典大小变为8, 此时计算hash的时候为5%8 == 5 , 该元素从1号slot迁移到了5号,1和5是对应的,我们称之为同模或者对应。
同模的槽位的元素最容易出现合并或者拆分了。因此在迭代的时候只要及时的扫描这些相关的槽位,这样就不会造成大面积的重复扫描。
使用Scan迭代哈希表时,有以下三种情况:
从迭代开始到结束,哈希表不 Rehash;
从迭代开始到结束,哈希表Rehash,但每次迭代,哈希表要么不开始 Rehash,要么已经结束 Rehash;
从一次迭代开始到结束,哈希表在一次或多次迭代中 Rehash。
即再 Rehash 过程中,执行 Scan 命令,这时数据可能只迁移了一部分。
因此,游标的实现需要兼顾以上三种情况。上述三种情况下游标实现的要求如下:
第一种情况比较简单。假设redis的hash表大小为4,第一个游标为0,读取第一个bucket的数据,然后游标返回2,下次读取bucket 2 ,依次遍历。
第二种情况更复杂。假设redis的hash表大小为4,如果rehash后大小变成8。如果如上返回游标(即返回2),则显示下图:
hash扩容.png
假设bucket 0读取后返回到cursor 2,当客户端再次Scan cursor 2时,hash表已经被rehash,大小翻倍到8,redis计算一个key bucket如下:
hash(key)&(size-1)
即如果大小为4,hash(key)&11,如果大小为8,hash(key)&111。所以当size从4扩大到8时,2 号bucket中的原始数据会被分散到2 (010) 和 6 (110) 这两个 bucket中。
从二进制来看,size为4时,在hash(key)之后,取低两位,即hash(key)&11,如果size为8,bucket位置为hash(key) & 111,即取低三个位。
所以依旧不会出现漏掉数据的情况。
第三种情况,如果返回游标2时正在进行rehash,则Hash表1的bucket 2中的一些数据可能已经rehash到了的Hash表2 的bucket[2]或bucket[6],那么必须完全遍历 哈希表2的 bucket 2 和 6,否则可能会丢失数据。
Redis 全局有两个Hash表,扩容时会渐进式的将表1的数据迁移到表2,查询时程序会先在 ht[0] 里面进行查找, 如果没找到的话, 就会继续到 ht[1] 里面进行查找。
具体游标计算代码如下:
Scan 命令中的游标,其实就是 Redis 内部的 bucket。

v = rev(v);
v++;
v = rev(v);
//下面将v的每一位倒过来再加1,再倒回去,这是什么意思呢,
//其实就是要将有效二进制位里面的高位第一个0位设置置为1,因为现在是0嘛
return v;

代码逻辑非常简单,计算过程如下:
逆二进制遍历.png
大小为 4 时,游标状态转换为 0-2-1-3。
当大小为 8 时,游标状态转换为 0-4-2-6-1-5-3-7
可以看出,当size由小变大时,所有原来的游标都能在大hashTable中找到对应的位置,并且顺序一致,不会遗漏数据。

缩容处理

之所以会出现重复数据,其实就是为了保证扩缩容后数据不丢。
假设当前 hash 大小为 8:
1)第一次先遍历了 0 号槽,返回游标为 4;
2)准备遍历 4 号槽,然后此时发生了缩容,4 号槽的元素也进到 0 号槽了。
3)但是0 号槽之前已经被遍历过了,此时会丢数据吗?
答案就在源码中:

do {
//扫描大点的表里面的槽位,注意这里是个循环,会将小表没有覆盖的slot全部扫描一次的
    /* Emit entries at cursor */
    de = t1->table[v & m1];
    while (de) {
        fn(privdata, de);
        de = de->next;
    }

    /* Increment bits not covered by the smaller mask */
    //下面的意思是,还需要扩展小点的表,将其后缀固定,然后看高位可以怎么扩充。
    //其实就是想扫描一下小表里面的元素可能会扩充到哪些地方,需要将那些地方处理一遍。
    //后面的(v & m0)是保留v在小表里面的后缀。
    //((v | m0) + 1) & ~m0) 是想给v的扩展部分的二进制位不断的加1,来造成高位不断增加的效果。
    v = (((v | m0) + 1) & ~m0) | (v & m0);

    /* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));//终止条件是v的高位区别位没有1了,其实就是说到头了。

​具体计算方法:​

v = (((v | m0) + 1) & ~m0) | (v & m0);

右边的下半部分是v,左边的上半部分是v。(v&m0) 取出v的低位,例如size=4时v&00000011
左半边(v|m0) + 1 将V 的低位设置为1,然后+1 将进位到v 的高位,再次&m0,V 的高位将被取出。
假设游标返回2并且正在rehashing,大小从4变为8,那么M0 = 00000011 v = 00000010
根据公式计算的下一个光标是 ((00000010 | 00000011) +1) & (11111111100) | (00000010 & 00000011) = (00000100) & (11111111100) | (00000000010) = (000000000110) 正好是 6。

总结

Scan Count 参数限制的是遍历的 bucket 数,而不是限制的返回的元素个数
由于不同 bucket 中的元素个数不同,其中满足条件的个数也不同,所以每次 Scan 返回元素也不一定相同
Count 越大,Scan 总耗时越短,但是单次耗时越大,即阻塞Redis 时间边长
推荐 Count 大小为 1W左右
当 Count = Redis Key 总数时,Scan 和 Keys 效果一致
Scan 采用 逆二进制迭代法来计算游标,主要为了兼容Rehash的情况
Scan 为了兼容缩容后不漏掉数据,会出现重复遍历。
即客户端需要做去重处理
核心就是 逆二进制迭代法,比较复杂,而且算法作者也没有具体证明,为什么这样就能实现,只是测试发现没有问题,各种情况都能兼容。
具体算法细节可以看一下这个Github 相关讨论

导语

这篇笔记继续上次《Kafka的学习笔记一》,这篇笔记主要内容有:Kafka的存储设计、消息的QoS、HW原理和控制器选主这几个部分。下面一一来看下内容。


Kafka的存储设计

Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,用性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读( read-ahead )会提前将一个比较大的磁盘块读入内存。后写 (write-behind) 会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存 (disk cache/page cache),所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)。

应用程序写入数据到文件系统的一般做法是:在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。但这里我们要做完全相反的事情,右图中所有的数据都写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。
kafka的核心磁盘写入原理.png

消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。 如下图(左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下:
1) 操作系统将数据从磁盘中读取文件到内核空间里的页面缓存
2)应用程序将数据从内核空间读入用户空间的缓冲区
3)应用程序将读到的数据写回内核空间并放入socket缓冲区
4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送归去

结合Kafka 的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如下图(右)所示,使用零拷贝技术( zero-copy )只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。这样,消息使用的速度基本上等同于网络连接的速度了。
kafka内存零copy技术.png

上面说了,Kafka broker会把消息存放到本地磁盘上,也就是日志文件,那么日志结构是什么样的?如下图:
截屏2021-08-06 上午12.52.46.png
日志是 Kafka 服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如状态管理机和副本管理器等。总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。一般情况下,一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

消息的QoS机制

QoS全称是Quality of Service,服务质量的承诺,就是Kafka的消息交付可靠性保障,Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。之前我们说过消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。如果消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。

要承诺消息交付可靠性保障,Kafka必然需要与Producer、Consumer有一些约定,首先就是怎么保证无消息丢失,Kafka对节点的存活定义有两个条件:

  • 节点必须和ZK保持会话;
  • 如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多。

满足这两个条件,叫作“正在同步中”( in-sync)。 每个分区的主副本会跟踪正在同步中的备份副本节点( In Sync Replicas ,即ISR)。如果一个备份副本挂掉、没有响应或者落后太多,主副本就会将其从同步副本集合中移除。反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中。Kafka 中, 一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了。任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失“。只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息,下面我们举例分析Kafka的消息提交机制如何保证消费者看到的数据是一致的。

1)生产者发布了10条消息,但都还没有提交(没有完全复制到ISR中的所有副本) 如果没有提交机制,消息写到主副本的节点就对消费者立即可见,即消费者可以立即看到这10条消息。但之后主副本挂掉了,这10条消息实际上就丢失了,而消费者之前能看到这 10 条丢失的数据,在主副本挂掉后就看不到了,导致消费者看到的数据出现了不一致。

2)如果有提交机制的保证,并且生产者发布的 10条消息还没有提交,则对消费者不可见。即使10条消息都已经写入主副本,但是它们在还没有来得及复制到其他备份副本之前,主副本就挂掉了。那么,这 10条消息就不算写入成功,生产者会重新发送这 10条消息。当这 10条消息成功地复制到ISR 的所有副本后,它们才会认为是提交的,即对消费者才是可见的。在这之后,即使主副本挂掉了也没有关系,因为原先消费者能看到主副本的10条消息,在新的主副本上也能看到这10条消息,不会出现不一致的情况。

引出一个重要结论,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

一些常简的消息丢失情况如下:

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这种情况下,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。

解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer 端而非 Broker 端。你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。
consumer丢失消息.png

这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。

同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。当然,这种处理方式可能带来的问题是消息的重复处理,这个问题后续继续探讨。

还有一类消息丢失场景是,Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。这里的关键在于 Consumer 自动提交位移,没有真正地确认消息是否真的被消费就盲目地更新了位移。这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。

还有问题,就是消息堆积,可能原因:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance。真正业务场景下,要结合实际情况去做分析。

说回到Kafka 是怎么做到消息精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。 ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,用来标识本次会话。 SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。broker在内存维护(pid,seq)映射,收到消息后检查seq。producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。

new_seq = old_seq+1: 正常消息;
new_seq <= old_seq : 重复消息;
new_seq > old_seq+1: 消息丢失;

另外我们需要了解幂等性Producer的作用范围。首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别。

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

高水位(HW)和Leader Epoch

在 Kafka 的世界中,水位是和位置信息绑定的,具体来说,它是用消息位移来表征的。高水位在界定 Kafka 消息对外可见性以及实现副本机制等方面起到了非常重要的作用,但其设计上的缺陷给 Kafka 留下了很多数据丢失或数据不一致的潜在风险。为此,社区引入了 Leader Epoch 机制,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation),尝试规避掉这类风险。下面将依次介绍高水位和Leader Epoch机制。

高水位的作用主要有2个:1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。2. 帮助 Kafka 完成副本同步。使用下图理解高水位中的关键概念:
HW和LEO介绍.png
假设这是某个分区 Leader 副本的高水位图。首先,请你注意图中的“已提交消息”和“未提交消息”。在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 6 的所有消息。注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。

另外,需要关注的是,位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。图中还有一个日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。这个副本当前只有 9 条消息,位移值是从 0 到 8,下一条新消息的位移是 9。显然,介于高水位和 LEO 之间的消息就属于未提交消息。这也从侧面告诉了我们一个重要的事实,那就是:同一个副本对象,其高水位值不会大于 LEO 值。

高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。

现在,我们知道了每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。
高水位更新机制.jpg

在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
高水位和LEO更新时机.jpg

下面用一个单分区并且有两个副本的主题作为例子展示一下Kafka副本同步的全流程。
1.首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
高水位更新流程1.jpg

  1. 当生产者给主题分区发送一条消息后,状态变更为:
    高水位更新流程2.jpg

此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。

  1. Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
    高水位更新流程3.jpg

这时,Follower 副本也成功地更新 LEO 为 1。

  1. 此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
    高水位更新流程4.jpg

在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。

从上面的描述,我们知道,依托于高水位,Kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制。不过,我们还是要思考一下这里面存在的问题。

上面副本同步的步骤4可知,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

假设现在有两个 Leader Epoch<0, 0> 和 <1, 120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。接下来,我们来看一个实际的例子,它展示的是 Leader Epoch 是如何防止数据丢失的。请先看下图,下图展示的是单纯依赖高水位,数据丢失的一种场景。
无leader epoch数据丢失.jpg

开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新。这是可能出现的,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。

严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1。此时一旦消息被写入到 Leader 副本的磁盘,就会被认为是“已提交状态”,但现有的时间错配问题导致 Follower 端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内,接连发生 Broker 宕机,那么这类数据的丢失就是不可避免的。现在,我们来看下如何利用 Leader Epoch 机制来规避这种数据丢失。我依然用图的方式来说明。
leader epoch防止数据丢失.jpg

场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。

控制器选举

Kafka利用了ZK的领导选举机制,每个代理节点都会参与竞选主控制器,但只有一个代理节点可以成为主控制器,其他代理节点只有在主控制器出现故障或者会话失效时参与领导选举。Kafka实现领导选举的做法是:每个代理节点都会作为ZK的客户端,向 ZK服务端尝试创建/controller临时节点,但最终只有一个代理节点可以成功创建/controller节点。由于主控制器创建的ZK节点是临时节点,因此当主控制器出现故障,或者会话失效时,临时节点会被删除。这时候所有的代理节点都会尝试重新创建/controller节点,并选举出新的主控制器。

主节点选举,首先需要面对的就是集群节点达成某种一致(共识)的问题。对于主从复制的数据库,所有节点需求就谁来充当主节点达成一致。如果由于网络故障原因出现节点之间无法通信,就很容易出现争议。此时,共识对于避免错误的故障切换十分重要,后者会导致两个节点都自认为是主节点即脑裂。如果集群中存在两个这样的节点,每个都在接受写请求,最终会导致数据产生分歧、不一致甚至数据丢失。

Zookeeper里采用的是Zab共识算法/协议。广义上说,共识算法必须满足以下的性质:
1)协商一致性,所有的节点都接受相同的协议。
2)诚实性,所有节点不能反悔,即对一项提议不能有两种决定。
3)合法性,如果决定了值v,则v一定是由某个节点所提议的。
4)可终止性,节点如果不崩溃则最终一定可以达成决议。

协商一致性和诚实性属性定义了共识的核心思想:决定一致的结果,一旦决定,就不能改变。如果不关心容错,那么满足前三个属性很容易:可以强行指定某个节点为“独裁者”,由它做出所有的决定。但是,如果该节点失败,系统就无法继续做出任何决定。可终止性则引入了容错的思想,它重点强调一个共识算法不能原地空转,永远不做事情。换句话说,它必须取得实质性进展,即使某些节点出现了故障,其他节点也必须做出最终决定。可终止性属于一种活性,另外三种则属于安全性方面的属性。

当然,如果所有的节点都崩溃了,那么无论何种算法都不可能继续做出决定。算法所能容忍的失败次数和规模都有一定的限制。事实上,可以证明任何共识算法都需要至少大部分节点正确运行才能确保终止性,而这个大多数就可以安全地构成quorum。因此,可终止性的前提是,发生崩溃或者不可用的节点必须小于半数节点。这里,我们暂时假定系统不存在拜占庭式错误。

最著名的容错式共识算法有Paxos,Raft和Zab。这些算法大部分其实并不是直接使用上述的形式化模型(提议并决定某个值,同时满足上面4个属性)。相反,他们是决定了一系列值,然后采用全序关系广播算法。全序关系广播的要点是,消息按照相同的顺序发送到所有的节点,有且只有一次。这其实相当于进行了多轮的共识过程:在每一轮,节点提出他们接下来想要发送的消息,然后决定下一个消息的全局顺序。所以,全序关系广播相当于持续的多轮共识(每一轮共识的决定对应于一条消息):

  • 由于协商一致性,所有节点决定以相同的顺序发送相同的消息。
  • 由于诚实性,消息不能重复。
  • 由于合法性,消息不回被破坏。也不是凭空捏造的。
  • 由于可终止性,消息不会丢失。

Raft和Zab都直接采取了全序关系广播,这比重复性的一轮共识只解决一个提议更加高效。ZooKeeper主要针对保存少量、完全可以放在内存中的数据(虽然最终仍然会写入磁盘以保证持久性),所以不要用它保存大量的数据。这些少量数据会通过容错的全序广播算法复制到所有节点上从而实现高可靠。ZooKeeper模仿了Google的Chubby锁服务,不仅实现了全序广播(因此也实现了共识),而且还构建了一组有趣的其他特性,这些特性在构建分布式系统时格外重要:
1)线性一致性的原子操作,使用原子CAS操作可以实现锁:如果多个节点同时尝试执行相同的操作,只有一个节点会成功。共识协议保证了操作的原子性和线性一致性,即使节点发生故障或网络在任意时刻中断。分布式锁通常以租约(lease)的形式实现,租约有一个到期时间,以便在客户端失效的情况下最终能被释放。
2)操作的全序排序,当某个资源受到锁或租约的保护时,你需要一个fencing令牌来防止客户端在进程暂停的情况下彼此冲突。fencing令牌是每次锁被获取时单调增加的数字。ZooKeeper通过全局排序操作来提供这个功能,它为每个操作提供一个单调递增的事务ID( zxid )和版本号( cversion )。
3)故障检测,客户端在ZooKeeper服务器上维护一个长期会话,客户端和服务器周期性地交换心跳包来检查节点是否存活。即使连接暂时中断,或者某个ZooKeeper节点发生失效,会话仍保持在活跃状态。但如果心跳停止的持续时间超出会话超时,ZooKeeper会声明会话失败。此时,所有该会话持有的锁资源可以配置为自动全部释放(ZooKeeper称之为ephemeral nodes即临时节点)。
4)变更通知,客户端不仅可以读取其他客户端创建的锁和键值,还可以监听它们的变更。因此,客户端可以知道其他客户端何时加入集群(基于它写入ZooKeeper的值),以及客户端是否发生了故障(会话超时导致临时节点消失)。通过订阅通知机制,客户端不用再通过频繁轮询的方式来找出变更。

推荐阅读书籍或材料

Kafka技术内幕
Apache Kafka实战

导语

本文为Kafka学习笔记,近期抽时间阅读了多本书籍和专栏博客,理解了一些Kafka的经典设计和思考,仅仅用于自己记录思考。由于涉及内容较多,我准备将内容分两篇文章。如有侵权,请联系删除。
kafka是一款开源的消息队列中间件。在业务系统中,消息队列的作用有两个,一是“削峰填谷”,所谓的“削峰填谷”就是指缓冲上下游瞬时突发的流量,使其更平滑。对于发送能力很强的上游系统,如果没有消息队列中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩,消息队列可以在很大程度上避免流量的震荡。另外一大好处在于发送方和接收方的松耦合,减少系统间不必要的交互。也很好理解,比如A业务模块要调用B业务模块的接口,如果引入消息队列,A需要调用B接口的情况下,A不调用接口,仅需把消息存入到消息队列,后续的操作交由消息队列的消费者完成,这样一来,A业务模块不必关心B业务模块的变动,实现系统级别的解耦。


总体架构

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。
kafka总体架构.png
整个 Kafka 体系结构中引入了以下3个术语:

  • Producer: 生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到 Kafka 中。
  • Consumer: 消费者,也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
  • Broker: 服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或
    Kafka服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka

实例。一个或多个Broker 组成了一个 Kafka 集群。一般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点。

在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。

offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
分区offset保序.png

如上图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。

每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。下图描绘了主题、分区、副本、Log 以及 LogSegment 之间的关系。
截屏2021-08-06 上午12.52.46.png

多副本机制

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。备份的思想,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。

同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

当然了,我们知道在很多其他系统中follower副本是可以对外提供服务的,比如 MySQL 的从库是可以处理读操作的,但是在 Kafka 中追随者副本不会对外提供服务。

看下kafka分区和副本的架构图:
kafka分区架构图.jpeg
如上所示,此kafka集群中有4个broker,每个主题有3个分区,副本因子(副本个数)为3,每个分区便有1个leader副本,2个follower副本。生产者和消费者只与leader副本交互,而follower负责从leader副本同步消息,自然地,follower副本上的消息会有一定的滞后性。

图中还有kafka消费客户端,即Consumer,使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。

分区中所有的副本都叫做AR,即Assigned Replicas。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR,即In-Sync Replicas,ISR集合是AR集合的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉去消息进行同步。“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。相对ISR,还有OSR,即Out-of-Sync Replicas,由此可见,AR=ISR+OSR。集群正常情况下,所有的副本应该和leader副本保持一致,AR=ISR,OSR的集合为空。

还有三个和ISR、OSR相关的概念介绍,HW、LSO和LEO,HW是Highwatermark,LSO是LogStartOffset,LEO是LogEndOffset。HW俗称高水位,标识特定消息的一个偏移量,至于哪个“特定”消息,后面会详细介绍,消费者只能拉取到这个offset之前的消息。
HW和LEO介绍.png
如上图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的 offset(LogStartOffset)为0,最后一条消息的 offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取到 offset 在0至5之间的消息,而 offset 为6的消息对消费者而言是不可见的。

LEO标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

分区机制

虽然有了副本机制可以保证数据的高可用,但没有解决伸缩性(Scalability)的问题。什么是伸缩性呢?拿副本来说,虽然现在有了领导者副本和追随者副本,但倘若领导者副本积累了太多的数据以至于单台 Broker 机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的 Broker 上?这种机制就是所谓的分区(Partitioning)。其他分布式系统里,你可能听说过分片、分区域等提法,比如 MongoDB 和 Elasticsearch 中的 Sharding、HBase 中的 Region,其实它们都是相同的原理。Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。

上面提到的副本如何与这里的分区联系在一起呢?实际上,副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有 1 个领导者副本和 N-1 个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。

Kafka 的三层消息架构:第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。

kafka的生产者

为什么要分区
分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。

分区策略
Kafka生产者的分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时它也支持自定义分区策略。常见的分区策略如下:

  1. 轮询策略
    也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
  2. 随机策略
    也称 Randomness 策略,所谓随机就是我们随意地将消息放置到任意一个分区上。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。
  3. 按消息键保序策略
    Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。Kafka的主题会有多个分区,分区作为并行任务的最小单位,为消息选择分区要根据消息是否含有键来判断。

kafka的消费者

消费模型
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。基于推送模型的消息系统,由broker记录消费者的消费状态。broker在将消息推送到消费者后,标记这条消息为已消费,这种方式无法很好地保证消息的处理语义。比如,broker把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为己消费了,但实际上这条消息并没有被实际处理) 。如果要保证消息的处理语义,broker发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种方式需要在客户端和服务端做一些复杂的状态一致性保证,比较复杂。

因此,kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。这种由消费者控制偏移量的优点是消费者可以按照任意的顺序消费消息,比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前时刻开始消费。broker是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。

消费者组
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

传统的消息引擎模型是点对点模型和分布/订阅模型,这种模型的伸缩性很差,因为下游的多个consumer都要抢这个共享消息队列的消息。发布/订阅模型倒是允许消息被多个 Consumer 消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影响消息的真实投递效果。

如果有这么一种机制,既可以避开这两种模型的缺陷,又兼具它们的优点,那就太好了。幸运的是,Kafka 的 Consumer Group 就是这样的机制。当 Consumer Group 订阅了多个主题后,组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制,Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。可以这么说,Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布/订阅模型。

分区是以消费者级别被消费的,但分区的消费进度要保存成消费者组级别的。一个分区只能属于一个消费者线程,但分区分配给消费者有以下几种场景,线程数量多于分区的数量,线程数量少于分区的数量,线程数量等于分区的数量。理想情况下,Consumer的实例数量应该等于Group订阅主题的分区总数。

针对 Consumer Group,Kafka 是怎么管理位移的呢?你还记得吧,消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中,这个位置信息有个专门的术语:位移(Offset)。老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。

但是,ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。于是,在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是__consumer_offsets,现在新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。

重平衡

Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件(Coordinator)的帮助下,完成订阅主题分区的分配。那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个:

  • 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,抑或是有Consumer实例崩溃被“踢出”组。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启Rebalance。

Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要被“踢出”组呢?

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Rebalance 发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与。你可能会问,每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要上面提到的分配策略的协助了。

另外,Rebalance有些“缺点”需要我们特别关注,思考更好的设计应该是什么样子的。

首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。

所以,我们尽量避免一些非必要的Rebalance。第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要仔细地设置 session.timeout.ms(决定了 Consumer 存活性的时间间隔)和 heartbeat.interval.ms(控制发送心跳请求频率的参数) 的值。第二类非必要 Rebalance 是 Consumer 消费时间过长导致的,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

消费组重平衡流程

消费者组的重平衡流程,它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助 Kafka Broker 端的 Coordinator 组件,在 Coordinator 的帮助下完成整个消费者组的分区重分配。

触发与通知
a)重平衡过程通过消息者端的心跳线程(Heartbeat Thread)通知到其他消费者实例。
b)Kafka Java消费者需要定期地发送心跳请求到Broker端的协调者,以表明它还存活着。
在kafka 0.10.1.0版本之前,发送心跳请求是在消费者主线程完成的,也就是代码中调用KafkaConsumer.poll方法的那个线程。这样做,消息处理逻辑也是在这个线程中完成的 ,因此,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者错判消费者已死。在此版本后,kafka社区引入了单独的心跳线程来专门执行心跳请求发送,避免这个问题。
c) 消费者端的参数heartbeat.interval.ms,从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。

消费者组状态机
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。Kafka设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。
a) Kafka消费者组状态
(1)Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
(2)Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。
(3)PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组。
(4)CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。
(5)stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。
b) 状态机的各个状态流转图如下:
消费者组重平衡流程.jpg
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka定期自动删除过期位移的条件就是,组要处于Empty状态。如果消费者组停了很长时间(超过7天),那么Kafka很可能就把该组的位移数据删除了。

消费者端重平衡流程
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。在消费者端,重平衡分为以下两个步骤:
1) 加入组:JoinGroup请求。
2) 等待领导者消费者分配方案:SyncGroup请求。
当组内成员加入组时,他会向协调者发送JoinGroup请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。注意区分这里的领导者和之前介绍的领导者副本,不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

选出领导者之后,协调者会把消费者组订阅信息封装进JoinGroup请求的响应中,然后发给领导者,由领导者统一做出分配方案后,进入下一步:发送SyncGroup请求。在这一步中,领导者向协调者发送SyncGroup请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。
选出领导.jpg
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker端(协调者端)重平衡场景剖析
分以下几个场景来讨论,这几个场景分别是新成员加入组、组成员主动离组、组成员崩溃离组、组成员提交位移。

  1. 新成员入组
    新成员入组是指组处于 Stable 状态后,有新成员加入。当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。
  2. 组成员主动离组
    主动离组就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
  3. 组成员崩溃离组
    崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数session.timeout.ms控制的。也就是说,Kafka 一般不会超过session.timeout.ms就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的。
  4. 重平衡时协调者对组内成员提交位移的处理
    正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后在开启正常JoinGroup/SyncGroup请求发送。

位移主题

__consumer_offsets 是Kafka里的内部主题,也被称为位移主题,即 Offsets Topic。和你创建的其他主题一样,位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它。为什么要引入位移主题这个概念呢?我们知道,版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。但是,ZooKeeper 其实并不适用于这种高频的写操作。

新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。那么位移主题的消息格式是什么样子的呢?所谓的消息格式,可以简单地理解为是一个KV对。Key和Value分别表示消息的键值和消息体。

首先从 Key 说起。一个 Kafka 集群中的 Consumer 数量会有很多,既然这个主题保存的是 Consumer 的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个 Consumer 的。这种数据放在哪个字段比较合适呢?显然放在 Key 中比较合适。现在我们知道该主题消息的 Key 中应该保存标识 Consumer 的字段,那么,当前 Kafka 中什么字段能够标识 Consumer 呢?还记得之前我们说 Consumer Group 时提到的 Group ID 吗?没错,就是这个字段,它能够标识唯一的 Consumer Group。我们现在知道 Key 中保存了 Group ID,但是只保存 Group ID 就可以了吗?别忘了,Consumer 提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key 中还应该保存 Consumer 要提交位移的分区。总结一下,位移主题的 Key 中应该保存 3 部分内容:

接下来看看消息体的设计。也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。当然了,位移主题的消息格式可不是只有这一种。事实上,它有 3 种消息格式。除了刚刚我们说的这种格式,还有 2 种格式:

  • 用于保存 Consumer Group 信息的消息。
  • 用于删除 Group 过期位移甚至是删除 Group 的消息。
    第 1 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。第 2 种格式相对更加有名一些。它有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。那么,何时会写入这类消息呢?一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

好了,消息格式就说这么多,下面我们来说说位移主题是怎么被创建的。通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。什么地方会用到位移主题呢?我们前面一直在说 Kafka Consumer 提交位移时会写入该主题,那 Consumer 是怎么提交位移的呢?目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 10,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 10。由于是自动提交位移,位移主题中会不停地写入位移 =10 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。

kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。
位移compaction.png

kafka的服务端

kafka的副本机制
所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。副本机制有什么好处呢?

  • 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
  • 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
  • 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
    对于 Apache Kafka 而言,目前只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性。至于为什么没有提供第2个和第3个好处,会在接下来的内容里阐释。

Kafka 是有主题概念的,而每个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。所谓副本(Replica),本质就是一个只能追加写消息的提交日志。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
kafka分区架构图.jpeg

副本角色
既然分区下能够配置多个副本,而且这些副本的内容还要一致,那么很自然的一个问题就是:我们该如何确保副本中所有的数据都是一致的呢?特别是对 Kafka 而言,当生产者发送消息到某个主题后,消息是如何同步到对应的所有副本中的呢?针对这个问题,Apache Kafka采用基于领导者(Leader-based)的副本机制。

  • 在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower
    Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
  • 在 Kafka中,追随者副本是不对外提供服务的。这就是说,追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
  • 领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper
    提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

对于客户端用户而言,Kafka 的追随者副本没有任何作用,它既不能像 MySQL 那样帮助领导者副本“抗读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。Kafka 为什么要这样设计呢?其实这种副本机制有两个方面的好处:

  1. 方便实现"Read-your-writes"
    所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
  2. 方便实现单调读(Monotonic Reads)
    什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。

In-sync Replicas(ISR)

追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据。既然是异步的,就存在着不可能与 Leader 实时同步的风险。在探讨如何正确应对这种风险之前,我们必须要精确地知道同步的含义是什么。或者说,Kafka 要明确地告诉我们,追随者副本到底在什么条件下才算与 Leader 同步。基于这个想法,Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。那么,到底什么副本能够进入到 ISR 中呢?我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。ISR 是一个动态调整的集合,而非静态不变的。

Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是看 Broker 端参数 replica.lag.time.max.ms参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

Unclean领导者选举(Unclean Leader Election)

既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。这就是我们常说的分布式系统的CAP理论,在这个问题上,Kafka 赋予你选择 C(Consistency) 或 A (Availability)的权利。你可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过,我建议你不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

kafka如何处理请求

无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求/响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。

这里我们详细讨论一下 Kafka Broker 端处理请求的全流程。关于如何处理请求,我们很容易想到的方案有两个:

  1. 顺序处理请求
    这个方法实现简单,但是有个致命的缺陷,那就是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。
  2. 每个请求使用单独线程处理
    完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。这个方法的好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。但缺陷也同样明显。为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

既然这两种方案都不好,那么,Kafka 是如何处理请求的呢?用一句话概括就是,Kafka 使用的是 Reactor 模式(不熟悉的可以参考一下Scalable IO in Java)。Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。Reactor模式应用在Kafka Broker上,流程图如下:
请求处理框架.jpg
Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。现在我们了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。
请求的详细流转图.jpg
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。

请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。

刚刚那张图中有一个叫 Purgatory 的组件,它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

到目前为止,提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。但是,在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。当前这种一视同仁的处理方式对控制类请求是不合理的。为什么呢?因为控制类请求有这样一种能力:它可以直接令数据类请求失效!所以控制类请求应该有更高的优先级。举个简单的例子,假设我们删除了某个主题,那么控制器就会给该主题所有副本所在的 Broker 发送一个名为 StopReplica 的请求。如果此时 Broker 上存有大量积压的 Produce 请求,那么这个 StopReplica 请求只能排队等。如果这些 Produce 请求就是要向该主题发送消息的话,这就显得很讽刺了:主题都要被删除了,处理这些 Produce 请求还有意义吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。基于这些问题,社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。那么,社区是如何解决的呢?Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

Kafka的协调者

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤:

  1. 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  2. 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

kafka的控制器

控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器,行使其管理和协调的职责。控制器是重度依赖Zookeeper的,我们需要首先简单了解一下Apache Zookeeper框架。Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以“/”开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。如果以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode。持久性 znode 不会因为 ZooKeeper 集群重启而消失,而临时 znode 则与创建该 znode 的 ZooKeeper 会话绑定,一旦会话结束,该节点会被自动删除。ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。依托于这些功能,ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。Kafka 控制器大量使用 Watch 功能实现对集群的协调管理。我们一起来看一张图片,它展示的是 Kafka 在 ZooKeeper 中创建的 znode 分布。你不用了解每个 znode 的作用,但你可以大致体会下 Kafka 对 ZooKeeper 的依赖。
kafka控制器.jpg
那么控制器是如何被选出来的呢?Broker在启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举控制器的规则是:第一个成功创建/controller节点的Broker会被指定为控制器。

控制器的功能有:

  • 主题管理。
  • 分区重分配。
  • Preferred领导选举。
  • 集群成员管理。
  • 元数据服务。
    控制器上管理的元数据:

控制器上的元数据.jpg
控制器中保存的这些数据在Zookeeper中也保存了一份。每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。这里面比较重要的数据有:

  • 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

故障控制转移
在 Kafka 集群运行过程中,只能有一台 Broker 充当控制器的角色,那么这就存在单点失效(Single Point of Failure)的风险,Kafka 是如何应对单点失效的呢?答案就是,为控制器提供故障转移功能,也就是说所谓的 Failover。故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。
(1)为每个Broker创建一个对应的Socket连接,然后在创建一个专属的线程,用于向这些Broker发送特定的请求。
(2)控制连接zookeeper,也会创建单独的线程来处理Watch机制通知回调。
(3)控制器还会为主题删除创建额外的I/O线程。
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用ReetrantLock同步机制,进一步拖慢了整个控制器的处理速度。
在0.11版对控制器的底层设计进了重构,最大的改进是:把多线程的方案改成了单线程加事件队列的方案:

  • 单线程+队列的实现方式:社区引入了一个事件处理线程,统一处理各种控制器事件,然后控制器将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。
  • 单线程不代表之前提到的所有线程都被干掉了,控制器只是把缓存状态变更方面的工作委托给了这个线程而已。

控制器重构方案.jpg
第二个改进:将之前同步操作Zookeeper全部改为异步操作。 Zookeeper本身的API提供了同步写和异步写两种方式。同步操作zk,在有大量主题分区发生变更时,Zookeeper容易成为系统的瓶颈。

kafka的定时器

Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O(nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)。

延时请求(Delayed Operation),也称延迟请求,是指因未满足条件而暂时无法被处理的 Kafka 请求。举个例子,配置了 acks=all 的生产者发送的请求可能一时无法完成,因为 Kafka 必须确保 ISR 中的所有副本都要成功响应这次写入。因此,通常情况下,这些请求没法被立即处理。只有满足了条件或发生了超时,Kafka 才会把该请求标记为完成状态。这就是所谓的延时请求。

Kafka中使用的请求被延时处理的机制是分层时间轮算法。想想我们生活中的手表。手表由时针、分针和秒针组成,它们各自有独立的刻度,但又彼此相关:秒针转动一圈,分针会向前推进一格;分针转动一圈,时针会向前推进一格。这就是典型的分层时间轮。和手表不太一样的是,Kafka 自己有专门的术语。在 Kafka 中,手表中的“一格”叫“一个桶(Bucket)”,而“推进”对应于 Kafka 中的“滴答”,也就是 tick。除此之外,每个 Bucket 下也不是白板一块,它实际上是一个双向循环链表(Doubly Linked Cyclic List),里面保存了一组延时请求。由于是双向链表结构,能够利用 next 和 prev 两个指针快速地定位元素,因此,在 Bucket 下插入和删除一个元素的时间复杂度是 O(1)。当然,双向链表要求同时保存两个指针数据,在节省时间的同时消耗了更多的空间。在算法领域,这是典型的用空间去换时间的优化思想。
双端循环链表.png
在 Kafka 中,具体是怎么应用分层时间轮实现请求队列的呢?
时间轮设计.png
如上图所示,Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize 计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 中的所有任务。

若时间轮的 tickMs 为 1ms 且 wheelSize 等于20,那么可以计算得出总体时间跨度 interval 为20ms。初始情况下表盘指针 currentTime 指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的 TimerTaskList 中。随着时间的不断推移,指针 currentTime 不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的 TimeTaskList 中的任务进行相应的到期操作。此时若又有一个定时为8ms的任务插进来,则会存放到时间格10中,currentTime 再过8ms后会指向时间格10。

如果此时有一个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小?Kafka 中不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个 wheelSize 为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
层级时间轮设计.png

如上图所示,复用之前的案例,第一层的时间轮 tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即20ms。每一层时间轮的 wheelSize 是固定的,都是20,那么第二层的时间轮的总体时间跨度 interval 为400ms。以此类推,这个400ms也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为8000ms。

对于之前所说的 350ms 的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的 TimerTaskList。如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的 TimerTaskList。注意到在到期时间为 [400ms,8000ms) 区间内的多个任务(比如 446ms、455ms 和 473ms 的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的 TimerTaskList 的超时时间为 400ms。

随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中。再经历 40ms 之后,此时这个任务又被“察觉”,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。

设计源于生活。我们常见的钟表就是一种具有三层结构的时间轮,第一层时间轮 tickMs=1s、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层 tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。

Kafka 中的定时器借了 JDK 中的 DelayQueue 来协助推进时间轮。具体做法是对于每个使用到的 TimerTaskList 都加入 DelayQueue,DelayQueue 会根据 TimerTaskList 对应的超时时间 expiration 来排序,最短 expiration 的 TimerTaskList 会被排在 DelayQueue 的队头。

Kafka 中会有一个线程来获取 DelayQueue 中到期的任务列表,这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。当“收割机”线程获取 DelayQueue 中超时的任务列表 TimerTaskList 之后,既可以根据 TimerTaskList 的 expiration 来推进时间轮的时间,也可以就获取的 TimerTaskList 执行相应的操作,对里面的 TimerTaskEntry 该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

我们开头明确指明的 DelayQueue 不适合 Kafka 这种高性能要求的定时任务,为何这里还要引入 DelayQueue 呢?注意对定时任务项 TimerTaskEntry 的插入和删除操作而言,TimingWheel时间复杂度为 O(1),性能高出 DelayQueue 很多,如果直接将 TimerTaskEntry 插入 DelayQueue,那么性能显然难以支撑。

分析到这里可以发现,Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry 的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的 expiration 为 200ms,第二个超时任务为 840ms,这里获取 DelayQueue 的队头只需要 O(1) 的时间复杂度(获取之后 DelayQueue 内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用 DelayQueue 来辅助以少量空间换时间,从而做到了“精准推进”。Kafka 中的定时器真可谓“知人善用”,用 TimingWheel 做最擅长的任务添加和删除操作,而用 DelayQueue 做最擅长的时间推进工作,两者相辅相成。

总结

至此,这篇笔记中涉及了kafka整体架构,关键核心概念以及核心组件功能和设计的记录,但是存储层、kafka故障转移时的选举算法以及一些问题以及架构的思考还未深入,下篇笔记我继续写。

背景介绍

Redis在工作中特别常见,在很多业务架构的分享,Redis常常是作为单纯的缓存使用,目的是缓解持久层(比如MySQL)的大流量的访问,最终起到的作用是防止持久层因为海量访问而挂掉。Redis通过将数据保存在内存中,Redis得以拥有极高的读写性能。一旦服务进程退出,Redis的数据就会全部丢失。所以,很多情况下,业务上并不会使用Redis作为数据存储层。
但是,为了解决这个问题,稍微了解些Redis的同学,至少应该听说过Redis的两个持久化方案,分别是RDB、AOF两种持久化方案,这两个方案目标是一样的,就是将内存中的数据保存到磁盘中,避免数据丢失。很多人和我应该类似,听过但从没在实践中使用Redis持久化数据,在我看来,没用过光听过,肯定不是真正的了解,咱至少也得深入了解(暂时用不上的情况下),不能光停留在“听过”。
我会带着这几个问题,去深入了解Redis的持久化机制?
问题一、Redis的持久化机制RDB如何实现?
问题二、Redis的持久化机制AOF如何实现?
问题三、Redis的持久化机制怎么保证数据高可用?
问题四、Redis的持久化机制在哪些业务场景适用?

Redis的持久化机制-RDB

Redis的持久化机制怎么实现,大白话说法,Redis持久化也是把内存数据保存到磁盘上。RDB(Redis Database)是保存内存数据库的快照(SanpShot),而AOF(Append Only File)是保存执行的写操作列表。但这里面的门道是很多的,我们先来了解下RDB的持久化。

Redis数据库状态.png

举个例子,上图展示了一个包含三个非空数据库的Redis服务器,这三个数据库以及数据库中的键值对就是该服务器的数据库状态。RDB持久化就是生成一个RDB文件,当然是经过压缩的二进制文件,通过该文件可以还原生成RDB文件时对应的数据库状态。

RDB文件持久化数据库状态.png

RDB文件是通过SAVE或者BGSAVE命令创建的,代码是rdb.c/rdbSave,有兴趣可以读一下,我这就不展开分析了,SAVE和BGSAVE最终都会调用rdbSave的函数,区别在于调用的方法不同,看名字也能猜出个大概,SAVE是阻塞运行的,而BGSAVE是非阻塞运行的,
SAVE命令调用的方式:

void saveCommand(redisClient *c) {

    // BGSAVE 已经在执行中,不能再执行 SAVE
    // 否则将产生竞争条件
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
        return;
    }

    // 执行 
    if (rdbSave(server.rdb_filename) == REDIS_OK) {
        addReply(c,shared.ok);
    } else {
        addReply(c,shared.err);
    }
}

BGSAVE

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    // 如果 BGSAVE 已经在执行,那么出错
    if (server.rdb_child_pid != -1) return REDIS_ERR;

    ……

    if ((childpid = fork()) == 0) {
        ……
        // 执行保存操作
        retval = rdbSave(filename);
        ……

        // 向父进程发送信号
        exitFromChild((retval == REDIS_OK) ? 0 : 1);

    } else {

        /* Parent */

        // 计算 fork() 执行的时间
        server.stat_fork_time = ustime()-start;

        // 如果 fork() 出错,那么报告错误
        if (childpid == -1) {
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }

        ……

        // 记录数据库开始 BGSAVE 的时间
        server.rdb_save_time_start = time(NULL);

        // 记录负责执行 BGSAVE 的子进程 ID
        server.rdb_child_pid = childpid;

        ……

        return REDIS_OK;
    }

    return REDIS_OK; /* unreached */
}

代码很明显能看到,不管是SAVE还是BGSAVE,在执行命令期间(生成RDB文件时),如果再发送SAVE或者BGSAVE都是拒绝的,原因是生成RDB文件时,保存的是全量内存数据,所以极可能产生不小的磁盘I/O和CPU算力。
再来讲讲生成RDB文件的触发方式,有两种,一种是发送SAVE和BGSAVE命令。另外一种是自动间隔性保存,举个例子:
save 1000 1
save 300 20
save 60 20000
那么只要满足以下三个条件中的任意一个,BGSAVE命令就会被执行:
服务器在1000秒之内,对数据库进行了至少1次修改。
服务器在300秒之内,对数据库进行了至少20次修改。
服务器在60秒之内,对数据库进行了至少20000次修改。
那么代码实现上,关键的数据结构是struct saveparam,如下:

struct redisServer {
    ……
    // 记录保存条件的数组
    struct saveparam * saveparams;
    ……
}

那么上面的自动保存间隔参数,在内存中就是这样保存的:
serverparams.png

OK,Redis服务器会有一个定时任务的入口redis.c/serverCron,当中就会遍历以上条件是否满足,满足的话,触犯BGSAVE操作。

// 遍历所有保存条件,看是否需要执行 BGSAVE 命令
 for (j = 0; j < server.saveparamslen; j++) {
    struct saveparam *sp = server.saveparams+j;

    /* Save if we reached the given amount of changes,
     * the given amount of seconds, and if the latest bgsave was
     * successful or if, in case of an error, at least
     * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
    // 检查是否有某个保存条件已经满足了
    if (server.dirty >= sp->changes &&
        server.unixtime-server.lastsave > sp->seconds &&
        (server.unixtime-server.lastbgsave_try >
         REDIS_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == REDIS_OK))
    {
        redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
            sp->changes, (int)sp->seconds);
        // 执行 BGSAVE
        rdbSaveBackground(server.rdb_filename);
        break;
    }
 }

这里面还是不少实现细节的,比如生成RDB文件的子进程如何通知主进程,已经完成了生成文件的操作(通过信号),还有异常情况的处理,主进程收到了Kill信号,这时候也需要调用RDB生成文件,优雅退出程序等等。
关于RDB二进制文件的格式,这里就先略过了,不详细记录在学习笔记上了。

Redis的持久化机制-AOF

再来是AOF持久化机制,AOF( append only file )持久化以独立日志文件的方式记录每条写命令,并在 Redis 启动时回放 AOF 文件中的命令以达到恢复数据的目的。由于AOF会以追加的方式记录每一条redis的写命令,因此随着Redis处理的写命令增多,AOF文件也会变得越来越大,命令回放的时间也会增多,为了解决这个问题,为了解决AOF文件体积膨胀的问题,Redis提供了AOF文件重写(rewrite)功能。通过该功能,Redis服务器可以创建一个新的AOF文件来替代现有的AOF文件,新旧两个AOF文件所保存的数据库状态相同,但新AOF文件不会包含任何浪费空间的冗余命令,所以新AOF文件的体积通常会比旧AOF文件的体积要小得多。在同步到AOF文件前,Redis服务程序会先把命令写入到AOF缓冲区。

AOF机制.png

在介绍“AOF文件重写”之前,先简单说下,AOF这里的触发机制,这个和RDB不同,首先没有命令手动触发的方式,完全是通过服务器的设置,appendonly和appendfsync,appendonly打开情况下,appendfsync有3个可选值,always、everysecond和no。always服务器在每个事件循环都要将aof_buf缓冲区中的所有内容写入到AOF文件,并且同步AOF文件,服务器在每个事件循环都要将aof_buf缓冲区中的所有内容写入到AOF文件,并且同步AOF文件,服务器在每个事件循环都要将aof_buf缓冲区中的所有内容写入到AOF文件,至于何时对AOF文件进行同步,则由操作系统控制。
比较妙的是,虽然Redis将生成新AOF文件替换旧AOF文件的功能命名为“AOF文件重写”,但实际上,AOF文件重写并不需要对现有的AOF文件进行任何读取、分析或者写入操作,这个功能是通过读取服务器当前的数据库状态来实现的。直接上代码不啰嗦:

// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {

     ……
    /* Iterate this DB writing every entry 
     *
     * 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
     */
    while((de = dictNext(di)) != NULL) {
        sds keystr;
        robj key, *o;
        long long expiretime;

        // 取出键
        keystr = dictGetKey(de);

        // 取出值
        o = dictGetVal(de);
        initStaticStringObject(key,keystr);

        // 取出过期时间
        expiretime = getExpire(db,&key);

        /* If this key is already expired skip it 
         *
         * 如果键已经过期,那么跳过它,不保存
         */
        if (expiretime != -1 && expiretime < now) continue;

        /* Save the key and associated value 
         *
         * 根据值的类型,选择适当的命令来保存值
         */
        if (o->type == REDIS_STRING) {
            /* Emit a SET command */
            char cmd[]="*3\r\n$3\r\nSET\r\n";
            if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
            /* Key and value */
            if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
            if (rioWriteBulkObject(&aof,o) == 0) goto werr;
        } else if (o->type == REDIS_LIST) {
            if (rewriteListObject(&aof,&key,o) == 0) goto werr;
        } else if (o->type == REDIS_SET) {
            if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
        } else if (o->type == REDIS_ZSET) {
            if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
        } else if (o->type == REDIS_HASH) {
            if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
        } else {
            redisPanic("Unknown object type");
        }

        /* Save the expire time 
         *
         * 保存键的过期时间
         */
        if (expiretime != -1) {
            char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";

            // 写入 PEXPIREAT expiretime 命令
            if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
            if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
            if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
        }
    }

    // 释放迭代器
    dictReleaseIterator(di);
}

是不是很简单明了,但是AOF这里有一个重写一致性的问题,举例子:

AOF重写数据不一致.png

上图所示,当子进程开始进行文件重写时,数据库中只有k1一个键,但是当子进程完成AOF文件重写之后,服务器进程的数据库中已经新设置了k2、k3、k4三个键,因此,重写后的AOF文件和服务器当前的数据库状态并不一致,新的AOF文件只保存了k1一个键的数据,而服务器数据库现在却有k1、k2、k3、k4四个键。为了解决这种数据不一致问题,Redis服务器设置了一个AOF重写缓冲区,这个缓冲区在服务器创建子进程之后开始使用,当Redis服务器执行完一个写命令之后,它会同时将这个写命令发送给AOF缓冲区和AOF重写缓冲区。如下图所示:

AOF重写解决数据一致性问题.png

最右侧的虚线表示重写完之后,替换AOF文件。更详细的流程,可以参考下图:

详细AOF重写解决数据一致性问题.png

注:AOFRW表示AOF文件重写

Redis的持久化机制怎么保证数据高可用

关于这个问题,在了解上述持久化机制的实现方案后,我的理解是,不管是RDB还是AOF机制,都存在丢失数据的可能性。RDB机制下,数据丢失的概率比AOF机制更大些,而且不管是RDB还是AOF都仅仅是单机上的数据持久化,如果单机的存储磁盘挂了,是没有多备份的,关于这点,Redis也是有解决方案的,Redis的哨兵模式或者Redis集群模式就能提供多机的高可用,所以在实际业务场景中,我认为完全有可能,在清楚地评估业务丢失数据容忍度的情况下,去使用Redis的数据持久化方案。

Redis的持久化机制在哪些业务场景适用

对持久化有要求,又特别适合使用Redis的场景主要有这么两个,特点数据量不大,写入和查询比例差不多。

  • 排行榜相关问题

关系型数据库在排行榜方面写入和查询速度较慢,可能不太适合使用关系型数据库,太重。
比如在线上PK类型的活动中,需要实时展示参与作品的点赞排行榜, 点赞数随着活动进行会不断变化,可能还涉及关联点赞用户的基础信息,那么可以使用到redis中的数据结构,SortedSet和hashmap,来组合使用,满足业务场景。

  • 好友关系、黑白名单等的存储

在微博应用中,每个用户关注的人存在一个集合中,就很容易实现求两个人的共同好友功能,或者黑白名单的存储,都可以使用Redis作为持久存储,写入和查询速度较快,也不会比关系型数据库重,而且可以快速响应需求。

RDB和AOF的优劣对比

RDB和AOF的对比.png