导语

这篇笔记继续上次《Kafka的学习笔记一》,这篇笔记主要内容有:Kafka的存储设计、消息的QoS、HW原理和控制器选主这几个部分。下面一一来看下内容。


Kafka的存储设计

Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,用性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读( read-ahead )会提前将一个比较大的磁盘块读入内存。后写 (write-behind) 会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存 (disk cache/page cache),所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)。

应用程序写入数据到文件系统的一般做法是:在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。但这里我们要做完全相反的事情,右图中所有的数据都写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。
kafka的核心磁盘写入原理.png

消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。 如下图(左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下:
1) 操作系统将数据从磁盘中读取文件到内核空间里的页面缓存
2)应用程序将数据从内核空间读入用户空间的缓冲区
3)应用程序将读到的数据写回内核空间并放入socket缓冲区
4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送归去

结合Kafka 的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如下图(右)所示,使用零拷贝技术( zero-copy )只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。这样,消息使用的速度基本上等同于网络连接的速度了。
kafka内存零copy技术.png

上面说了,Kafka broker会把消息存放到本地磁盘上,也就是日志文件,那么日志结构是什么样的?如下图:
截屏2021-08-06 上午12.52.46.png
日志是 Kafka 服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如状态管理机和副本管理器等。总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。一般情况下,一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

消息的QoS机制

QoS全称是Quality of Service,服务质量的承诺,就是Kafka的消息交付可靠性保障,Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。之前我们说过消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。如果消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。

要承诺消息交付可靠性保障,Kafka必然需要与Producer、Consumer有一些约定,首先就是怎么保证无消息丢失,Kafka对节点的存活定义有两个条件:

  • 节点必须和ZK保持会话;
  • 如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多。

满足这两个条件,叫作“正在同步中”( in-sync)。 每个分区的主副本会跟踪正在同步中的备份副本节点( In Sync Replicas ,即ISR)。如果一个备份副本挂掉、没有响应或者落后太多,主副本就会将其从同步副本集合中移除。反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中。Kafka 中, 一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了。任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失“。只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息,下面我们举例分析Kafka的消息提交机制如何保证消费者看到的数据是一致的。

1)生产者发布了10条消息,但都还没有提交(没有完全复制到ISR中的所有副本) 如果没有提交机制,消息写到主副本的节点就对消费者立即可见,即消费者可以立即看到这10条消息。但之后主副本挂掉了,这10条消息实际上就丢失了,而消费者之前能看到这 10 条丢失的数据,在主副本挂掉后就看不到了,导致消费者看到的数据出现了不一致。

2)如果有提交机制的保证,并且生产者发布的 10条消息还没有提交,则对消费者不可见。即使10条消息都已经写入主副本,但是它们在还没有来得及复制到其他备份副本之前,主副本就挂掉了。那么,这 10条消息就不算写入成功,生产者会重新发送这 10条消息。当这 10条消息成功地复制到ISR 的所有副本后,它们才会认为是提交的,即对消费者才是可见的。在这之后,即使主副本挂掉了也没有关系,因为原先消费者能看到主副本的10条消息,在新的主副本上也能看到这10条消息,不会出现不一致的情况。

引出一个重要结论,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

一些常简的消息丢失情况如下:

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这种情况下,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。

解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer 端而非 Broker 端。你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。
consumer丢失消息.png

这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。

同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。当然,这种处理方式可能带来的问题是消息的重复处理,这个问题后续继续探讨。

还有一类消息丢失场景是,Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。这里的关键在于 Consumer 自动提交位移,没有真正地确认消息是否真的被消费就盲目地更新了位移。这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。

还有问题,就是消息堆积,可能原因:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance。真正业务场景下,要结合实际情况去做分析。

说回到Kafka 是怎么做到消息精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。 ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,用来标识本次会话。 SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。broker在内存维护(pid,seq)映射,收到消息后检查seq。producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。

new_seq = old_seq+1: 正常消息;
new_seq <= old_seq : 重复消息;
new_seq > old_seq+1: 消息丢失;

另外我们需要了解幂等性Producer的作用范围。首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别。

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

高水位(HW)和Leader Epoch

在 Kafka 的世界中,水位是和位置信息绑定的,具体来说,它是用消息位移来表征的。高水位在界定 Kafka 消息对外可见性以及实现副本机制等方面起到了非常重要的作用,但其设计上的缺陷给 Kafka 留下了很多数据丢失或数据不一致的潜在风险。为此,社区引入了 Leader Epoch 机制,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation),尝试规避掉这类风险。下面将依次介绍高水位和Leader Epoch机制。

高水位的作用主要有2个:1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。2. 帮助 Kafka 完成副本同步。使用下图理解高水位中的关键概念:
HW和LEO介绍.png
假设这是某个分区 Leader 副本的高水位图。首先,请你注意图中的“已提交消息”和“未提交消息”。在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 6 的所有消息。注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。

另外,需要关注的是,位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。图中还有一个日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。这个副本当前只有 9 条消息,位移值是从 0 到 8,下一条新消息的位移是 9。显然,介于高水位和 LEO 之间的消息就属于未提交消息。这也从侧面告诉了我们一个重要的事实,那就是:同一个副本对象,其高水位值不会大于 LEO 值。

高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。

现在,我们知道了每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。
高水位更新机制.jpg

在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
高水位和LEO更新时机.jpg

下面用一个单分区并且有两个副本的主题作为例子展示一下Kafka副本同步的全流程。
1.首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
高水位更新流程1.jpg

  1. 当生产者给主题分区发送一条消息后,状态变更为:
    高水位更新流程2.jpg

此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。

  1. Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
    高水位更新流程3.jpg

这时,Follower 副本也成功地更新 LEO 为 1。

  1. 此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
    高水位更新流程4.jpg

在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。

从上面的描述,我们知道,依托于高水位,Kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制。不过,我们还是要思考一下这里面存在的问题。

上面副本同步的步骤4可知,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

假设现在有两个 Leader Epoch<0, 0> 和 <1, 120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。接下来,我们来看一个实际的例子,它展示的是 Leader Epoch 是如何防止数据丢失的。请先看下图,下图展示的是单纯依赖高水位,数据丢失的一种场景。
无leader epoch数据丢失.jpg

开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新。这是可能出现的,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。

严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1。此时一旦消息被写入到 Leader 副本的磁盘,就会被认为是“已提交状态”,但现有的时间错配问题导致 Follower 端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内,接连发生 Broker 宕机,那么这类数据的丢失就是不可避免的。现在,我们来看下如何利用 Leader Epoch 机制来规避这种数据丢失。我依然用图的方式来说明。
leader epoch防止数据丢失.jpg

场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。

控制器选举

Kafka利用了ZK的领导选举机制,每个代理节点都会参与竞选主控制器,但只有一个代理节点可以成为主控制器,其他代理节点只有在主控制器出现故障或者会话失效时参与领导选举。Kafka实现领导选举的做法是:每个代理节点都会作为ZK的客户端,向 ZK服务端尝试创建/controller临时节点,但最终只有一个代理节点可以成功创建/controller节点。由于主控制器创建的ZK节点是临时节点,因此当主控制器出现故障,或者会话失效时,临时节点会被删除。这时候所有的代理节点都会尝试重新创建/controller节点,并选举出新的主控制器。

主节点选举,首先需要面对的就是集群节点达成某种一致(共识)的问题。对于主从复制的数据库,所有节点需求就谁来充当主节点达成一致。如果由于网络故障原因出现节点之间无法通信,就很容易出现争议。此时,共识对于避免错误的故障切换十分重要,后者会导致两个节点都自认为是主节点即脑裂。如果集群中存在两个这样的节点,每个都在接受写请求,最终会导致数据产生分歧、不一致甚至数据丢失。

Zookeeper里采用的是Zab共识算法/协议。广义上说,共识算法必须满足以下的性质:
1)协商一致性,所有的节点都接受相同的协议。
2)诚实性,所有节点不能反悔,即对一项提议不能有两种决定。
3)合法性,如果决定了值v,则v一定是由某个节点所提议的。
4)可终止性,节点如果不崩溃则最终一定可以达成决议。

协商一致性和诚实性属性定义了共识的核心思想:决定一致的结果,一旦决定,就不能改变。如果不关心容错,那么满足前三个属性很容易:可以强行指定某个节点为“独裁者”,由它做出所有的决定。但是,如果该节点失败,系统就无法继续做出任何决定。可终止性则引入了容错的思想,它重点强调一个共识算法不能原地空转,永远不做事情。换句话说,它必须取得实质性进展,即使某些节点出现了故障,其他节点也必须做出最终决定。可终止性属于一种活性,另外三种则属于安全性方面的属性。

当然,如果所有的节点都崩溃了,那么无论何种算法都不可能继续做出决定。算法所能容忍的失败次数和规模都有一定的限制。事实上,可以证明任何共识算法都需要至少大部分节点正确运行才能确保终止性,而这个大多数就可以安全地构成quorum。因此,可终止性的前提是,发生崩溃或者不可用的节点必须小于半数节点。这里,我们暂时假定系统不存在拜占庭式错误。

最著名的容错式共识算法有Paxos,Raft和Zab。这些算法大部分其实并不是直接使用上述的形式化模型(提议并决定某个值,同时满足上面4个属性)。相反,他们是决定了一系列值,然后采用全序关系广播算法。全序关系广播的要点是,消息按照相同的顺序发送到所有的节点,有且只有一次。这其实相当于进行了多轮的共识过程:在每一轮,节点提出他们接下来想要发送的消息,然后决定下一个消息的全局顺序。所以,全序关系广播相当于持续的多轮共识(每一轮共识的决定对应于一条消息):

  • 由于协商一致性,所有节点决定以相同的顺序发送相同的消息。
  • 由于诚实性,消息不能重复。
  • 由于合法性,消息不回被破坏。也不是凭空捏造的。
  • 由于可终止性,消息不会丢失。

Raft和Zab都直接采取了全序关系广播,这比重复性的一轮共识只解决一个提议更加高效。ZooKeeper主要针对保存少量、完全可以放在内存中的数据(虽然最终仍然会写入磁盘以保证持久性),所以不要用它保存大量的数据。这些少量数据会通过容错的全序广播算法复制到所有节点上从而实现高可靠。ZooKeeper模仿了Google的Chubby锁服务,不仅实现了全序广播(因此也实现了共识),而且还构建了一组有趣的其他特性,这些特性在构建分布式系统时格外重要:
1)线性一致性的原子操作,使用原子CAS操作可以实现锁:如果多个节点同时尝试执行相同的操作,只有一个节点会成功。共识协议保证了操作的原子性和线性一致性,即使节点发生故障或网络在任意时刻中断。分布式锁通常以租约(lease)的形式实现,租约有一个到期时间,以便在客户端失效的情况下最终能被释放。
2)操作的全序排序,当某个资源受到锁或租约的保护时,你需要一个fencing令牌来防止客户端在进程暂停的情况下彼此冲突。fencing令牌是每次锁被获取时单调增加的数字。ZooKeeper通过全局排序操作来提供这个功能,它为每个操作提供一个单调递增的事务ID( zxid )和版本号( cversion )。
3)故障检测,客户端在ZooKeeper服务器上维护一个长期会话,客户端和服务器周期性地交换心跳包来检查节点是否存活。即使连接暂时中断,或者某个ZooKeeper节点发生失效,会话仍保持在活跃状态。但如果心跳停止的持续时间超出会话超时,ZooKeeper会声明会话失败。此时,所有该会话持有的锁资源可以配置为自动全部释放(ZooKeeper称之为ephemeral nodes即临时节点)。
4)变更通知,客户端不仅可以读取其他客户端创建的锁和键值,还可以监听它们的变更。因此,客户端可以知道其他客户端何时加入集群(基于它写入ZooKeeper的值),以及客户端是否发生了故障(会话超时导致临时节点消失)。通过订阅通知机制,客户端不用再通过频繁轮询的方式来找出变更。

推荐阅读书籍或材料

Kafka技术内幕
Apache Kafka实战

标签: none

已有 2 条评论

  1. 1 1

    1

  2. 1 1

    555

添加新评论