Skip to content

Kafka 相关的基本概念和常见问题

4106字约14分钟

分布式大数据中间件

2024-11-12

Kafka 是目前最主流的分布式流式处理平台, 它以高吞吐、可持久化、可水平扩展、支持流式数据处理等多种特性而被广泛使用.

主要的运用场景有三种:

  • 消息系统: 在传统的消息系统的功能, 如: 系统解耦, 流量削峰, 缓冲, 异步通信等基础上, Kafka 还提供了大多数消息系统无法提供的消息顺序保障以及回溯消费的功能.
  • 存储系统: Kafka 会把消息持久化到磁盘, 并且拥有多副本机制.
  • 流式处理平台: Kafka 不仅仅为每个流行的流式处理平台提供了可靠的数据来源, 还提供了一个完整的流式处理类库.

本文只是我个人对于 Kafka 一些基础概念的笔记以及对工作中常见问题的记录, 并非一个严格的新手教学, 对里面的概念不会做过多的解释. 比较适合对 Kafka 已经有一定概念的人进行复习和查找阅读.

你可以根据感兴趣的内容进行查看:

基本概念

  1. Producer

    生产者, 发消息的一方. 生产者负责创建消息, 然后将其发送到 Kafka 中.

  2. Consumer

    消费者, 接受消息的一方. 消费者连接到 Kafka 并接收消息, 然后进行相关的业务逻辑处理.

  3. Broker

    服务代理节点. 可以看走一台 Kafka 服务器.

  4. Topic

    主题. 逻辑上的概念, 方便将消息分类.

    一个 Topic 可以横跨多个 Broker.

  5. Partition

    分区. 一个 Topic 下会有许多 Partition, 但是一个 Partition 只会属于一个 Topic. 同一个 Topic 下, 不同 Partition 内存储的消息是不同的.

    Partition 内的消息有序, 而 Topic 内的消息不保证有序.

  6. Replica

    多副本机制. 副本之间保持 "一主多从" 关系. leader-Partition 负责读写, follower-Partition 只负责同步数据.

  7. AR

    Partition 中所有副本的集合, 包括 leader 本身.

  8. ISR

    与 leader 保持一定程度上同步的副本集合.

  9. OSR

    与 leader 同步关系滞后过多的副本集合.

    AR = ISR + OSR, 而正常情况下不会有副本同步状态滞后过多, 即 AR = ISR.

    ISR 伸缩

    leader 将滞后太多的 follower 踢出 ISR 的过程也被成为 ISR 伸缩.

  10. HW

    HighWatermark 的缩写, 即 "高水位". 标识一个特定的 offset, Consumer 只能读到 HW 之前的消息.

  11. LEO

    Log End Offset 的缩写. 标识下一条即将写入的消息的 offset.

    Partition 中各种偏移量的说明
    Partition 中各种偏移量的说明

    ISR 中最小的 LEO 即为 Partition 的 HW.

    HW 不可以被 Consumer 消费, 只能消费 HW 之前的消息. LEO 是下一条被写入消息的 offset, 不是最后一条消息的 offset.

    当一条消息被写入, LEO 立刻 +1, 但是 HW 则与 ISR 中 follower 的同步情况相关. leader 的 HW = ISR 中 follower 最小的 LEO.

  12. zookeeper

    是安装 Kafka 集群的必要组件.

    • Kafka 通过 zookeeper 来进行元数据的管理.
    • 负责 leader 的选举和管理.

Producer And Consumer

Producer

Kafka 中 Producer 是线程安全的.

发送消息到 Kafka 主要经历以下几个步骤:

  1. 配置参数创建实例.

    • bootstrap.servers
    • key.serializer
    • value.serializer
  2. 构建待发送的消息.

  3. 发送消息.

    send() -> 拦截器 -> 序列化器 -> 分区器 -> Broker.

  4. 关闭实例.

Producer 发送消息的三种模式

  • fire-and-forget: 发后既忘. 不期待任何返回, 不关心消息是否真正到达.
  • sync: 同步. 阻塞等待 Kafka 的响应.
  • async: 异步.

其实这也对应着 Kafka 的三种 ACK.

  • 1: 默认的成功返回. 表示 leader 成功接受消息.
  • 0: 生产者发送消息后不等待任何返回. 该情况下传输效率最高, 但是消息可靠性最低.
  • -1: Producer 需要等待 ISR 中所有 follower 都确认接收到消息后, 向 leader 发送 ACK, leader 才进行 commit. 消息可靠性最高.

Consumer

Kafka 中 Consumer 是 非线程安全的, 因为有状态.

Consumer Group

Consumer 负责订阅 Kafka 中的 Topic, 并从订阅的 Topic 中拉取消息. Consumer 有一个逻辑的分组概念: Consumer Group, 每个消费者属于一个消费者组. 当消息被发布到主题后, 只会被投递给订阅它的每个消费者组中的一个消费者.

以下几种消费方式 Kafka 都是支持的:

A single consumer in a consumer group
A single consumer in a consumer group
Multiple consumers in one consumer group
Multiple consumers in one consumer group
Multiple consumers reading the same records from the Topic
Multiple consumers reading the same records from the Topic

当 Consumer 的个数超过 Partition 个数的时候就会有 Consumer 分配不到任何分区从而不进行任何消费.

Additional consumers in a group sit idly
Additional consumers in a group sit idly

消费逻辑

  1. 配置参数创建实例.

    • bootstrap.servers
    • group.id
    • key.serializer
    • value.serializer
  2. 订阅 Topic.

  3. 拉取消息并消费.

  4. 提交 offset.

    提交 x + 1 而不是 x.

  5. 关闭实例.

Rebalance

触发场景

  1. 有新的 Consumer 加入 Consumer Group.
  2. 有 Consumer 宕机下线.
  3. 有 Consumer 主动退出 Consumer Group.
  4. Consumer Group 订阅的任何 Topic 的分区数量产生变化.

第一阶段 FIND_COORDINATOR

Consumer 需要确定其 Consumer Group 对应的 Group Coordinator 的 Broker 并与其建立连接.

如果 Consumer 保存了 Group Coordinator 节点信息且连接正常则进入下一个阶段. 如果没有则向集群负载最小的节点发送 FindCoordinatorRequest.

第二阶段 JOIN_GROUP

在成功找到 Consumer Group 对应的 Group Coordinator 后就进入该阶段.

此阶段 Consumer 会向 Group Coordinator 发送 JoinGroupRequest 并处理响应.

选举 Consumer Group 的 leader

如果组内没有 leader, 则加入 Consumer Group 的第一个 Consumer 成为 leader.

如果 leader 退出了 Consumer Group, 则取 Group Coordinator 的 HashMap 中第一对 (key, value) 做为 leader. 这种方式几乎等于随机.

选举分区分配策略

// TODO

第三阶段 SYNC_GROUP

leader 根据第二阶段选举出来的分区分配策略来实施具体的分配. 在此之前要将分配方案同步给各个 Consumer, 但是不是 leader 自己直接发布, 而是通过 Group Coordinator 发布.

此阶段 Consumer 会向 Group Coordinator 发送 SyncGroupRequest 来同步分配方案.

第四阶段 HEARTBEAT

进入该阶段后, Consumer Group 中所有的 Consumer 都可以正常运作. Consumer 和 Group Coordinator 之间会维持心跳. 如果停跳时间过长则会再次触发 Rebalance.

Zero-Copy

零拷贝(Zero-Copy)是指将数据直接从磁盘文件复制到网卡设备中, 而无需让设备进入用户态.

假设我们现在需要将磁盘里面的文件展示给用户. 这意味着我们需要将静态文件从磁盘中拷贝到一个内存 buf 中, 然后再将这个给 buf 通过 Socket 传递给用户. 但是在传统的拷贝中, 这个过程文件经历了 4 次复制.

传统拷贝技术
传统拷贝技术

我们可以清晰地看到, 其中第 2, 3 两次拷贝是完全没有必要的. 其实零拷贝技术就是避免了那两次拷贝.

零拷贝技术
零拷贝技术

在Kafka中, Zero Copy 使用场景的地方有两处:

  • 基于 mmap 的索引.
  • 日志文件读写所用的 TransportLayer.

基于 mmap 的索引. 索引都是基于 MappedByteBuffer 的, 也就是让用户态和内核态共享内核态的数据缓冲区. 此时, 数据不需要复制到用户态空间. 不过, mmap 虽然避免了不必要的拷贝, 但不一定就能保证很高的性能. 在不同的操作系统下, mmap 的创建和销毁成本可能是不一样的. 很高的创建和销毁开销会抵消 Zero Copy 带来的性能优势. 由于这种不确定性, 在Kafka中, 只有索引应用了 mmap, 最核心的日志并未使用 mmap 机制.

TransportLayer. TransportLayer 是 Kafka 传输层的接口. 它的某个实现类使用了 FileChannel 的 transferTo 方法. 该方法底层使用 sendfile 实现了 Zero Copy. 对 Kafka 而言, 如果 I/O 通道使用普通的 PLAINTEXT, 那么 Kafka 就可以利用 Zero Copy 特性, 直接将页缓存中的数据发送到网卡的 Buffer 中, 避免中间的多次拷贝. 相反, 如果 I/O 通道启用了 SSL, 那么 Kafka 便无法利用 Zero Copy 特性了.

常见问题

消息丢失

1. Consumer 丢失数据

消费者可能导致数据丢失的情况是: 消费者获取到了这条消息后, 还未处理, Kafka 就自动提交了 offset. 这时 Kafka 就认为消费者已经处理完这条消息, 其实消费者才刚准备处理这条消息. 这时如果消费者宕机, 那这条消息就丢失了.

消费者引起消息丢失的主要原因就是消息还未处理完 Kafka 会自动提交了 offset. 那么只要关闭自动提交 offset, 消费者在处理完之后手动提交 offset, 就可以保证消息不会丢失. 但是此时需要注意重复消费问题. 比如消费者刚处理完, 还没提交 offset, 这时自己宕机了, 此时这条消息肯定会被重复消费一次, 这就需要消费者根据实际情况保证幂等性.

2. Producer 丢失数据

对于生产者数据传输导致的数据丢失主常见情况是生产者发送消息给 Kafka, 由于网络等原因导致消息丢失. 对于这种情况也是通过在 Producer 端设置 acks=all 来处理, 这个参数是要求 leader 接收到消息后, 需要等到所有的 follower 都同步到了消息之后, 才认为本次写成功了. 如果没满足这个条件, 生产者会自动不断的重试.

3. Kafka 丢失数据

比较常见的一个场景, 就是 Kafka 某个 Broker 宕机, 然后重新选举 Partition 的 leader. 如果此时其他的 follower 刚好还有些数据没有同步, 结果此时 leader 挂了, 然后选举某个 follower 成 leader 之后, 就会造成数据丢失.

所以总结下来, 为了避免消息的丢失, 一般要求设置如下 4 个参数:

  • 给 Topic 设置 replication.factor 参数:这个值必须大于 1, 要求每个 Partition 必须有至少 2 个副本.
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1, 这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系, 没掉队, 这样才能确保 leader 挂了还有一个 follower 可以竞选成为 leader.
  • 在 Producer 端设置 acks=all:这个是要求每条数据, 必须是写入所有 replica 之后, 才能认为是写成功了.
  • 在 Producer 端设置 retries=MAX:这个是要求一旦写入失败, 就无限重试, 卡在这里了.

这样配置之后, 至少在 Kafka Broker 端就可以保证在 leader 所在 Broker 发生故障, 进行 leader 切换时, 数据不会丢失.

保证消息顺序性

主要需要考虑以下两点:

  • 如何保证消息在 Kafka 中的顺序.
  • 如何保证 Consumer 处理的顺序.

如何保证消息在 Kafka 中的顺序

对于 Kafka, 如果我们创建了一个 Topic, 默认有三个 Partition. 生产者在写数据的时候, 可以指定一个 key, 比如在订单 Topic 中我们可以指定订单 id 作为 key. 那么相同订单 id 的数据, 一定会被分发到同一个 Partition 中去, 而且这个 Partition 中的数据一定是有顺序的. 消费者从 Partition 中取出来数据的时候, 也一定是有顺序的. 通过制定 key 的方式首先可以保证在 Kafka 内部消息是有序的.

如何保证 Consumer 处理的顺序

对于某个 Topic 的一个 Partition, 只能被同组内部的一个 consumer 消费. 如果这个 consumer 内部还是单线程处理, 那么只要保证消息在 MQ 内部是有顺序的就可以保证消费也是有顺序的. 但是单线程吞吐量太低, 在处理大量 MQ 消息时, 我们一般会开启多线程消费机制.

那么如何保证消息在多个线程之间是被顺序处理的呢?对于多线程消费我们可以预先设置 N 个内存 Queue, 具有相同 key 的数据都放到同一个内存 Queue 中. 然后开启 N 个线程, 每个线程分别消费一个内存 Queue 的数据即可. 当然, 消息放到内存 Queue 中, 有可能还未被处理, consumer 发生宕机, 内存 Queue 中的数据会全部丢失, 这就转变为上面提到的如何保证消息的可靠传输的问题了.

不重复消费

此问题其实等价于消费幂等.

让生产者发送每条数据的时候, 里面加一个全局唯一的 id. 我们可以借助数据库来保证不重复消费:

  • 基于Redis. 消费之前去 Redis 里面查一下, 之前消费过吗?如果没有消费过, 就处理, 然后这个 id 写 Redis. 如果消费过了, 则无视.
  • 基于数据库的唯一键. 由于唯一键的约束, 重复数据插入只会报错, 不会导致数据库中出现脏数.

为什么 Kafka 不支持读写分离?

在 Kafka 中, 生产者写入消息和消费者读取消息的操作都是与 leader 副本进行交互的, 从而实现的是一种主写主读的生产消费模型.

Kafka 并不支持主写从读, 因为主写从读有 2 个很明显的缺点:

  • 数据一致性问题. 数据从主节点转到从节点必然会有一个延时的时间窗口, 这个时间 窗口会导致主从节点之间的数据不一致. 某一时刻, 在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y, 那么在这个变更通知到从节点之前, 应用读取从节点中的 A 数据的值并不为最新的 Y, 由此便产生了数据不一致的问题.
  • 延时问题. 类似 Redis 这种组件, 数据从写入主节点到同步至从节点中的过程需要经历 "网络->主节点内存->网络->从节点内存" 这几个阶段, 整个过程会耗费一定的时间. 而在 Kafka 中, 主从同步会比 Redis 更加耗时, 它需要经历 "网络->主节点内存->主节点磁盘->网络->从节点内存->从节点磁盘" 这几个阶段. 对延时敏感的应用而言, 主写从读的功能并不太适用.

总结

本文目前为止介绍了 Kafka 相关的基础知识以及对于常见问题的处理.

  • Producer 发送消息到 Kafka, 线程安全.
  • Consumer 接收消息, 非线程安全.
  • 当需要重新分配 Consumer 和 Topic 之间的关系的时候会触发 Rebalance.
  • Rebalance 有 4 个阶段.
  • Kafka 使用了零拷贝技术, 减少了 2 次用户态和内核态之间的转换.
  • 可以通过设置 4 个参数来保证 Kafka 的消息不丢失.
  • 可以通过设置 id 和 Queue 来保证消息的顺序性.
  • 借助 Redis 和数据库的唯一键约束可以避免重复消费.
  • Kafka 不支持读写分离.

随着后续的学习, 本文可能会持续更新.




变更历史

最后更新于: 查看全部变更历史
  • docs: update docs

    于 2024/11/21
  • docs: update docs

    于 2024/11/19
  • docs: update docs

    于 2024/11/18
  • docs: add the common problems section to kafka

    于 2024/11/14
  • docs: add new article

    于 2024/11/12