Loading...

《Apache Kafka实战》阅读笔记

–>

《Apache Kafka实战》阅读笔记

文章目录

  • 《Apache Kafka实战》阅读笔记
    • 认识Apache Kafka
      • Kafka的核心功能
      • Kafka概要设计
      • 基本概念与术语
    • Kafka线上环境部署
      • 磁盘容量规划
      • 内存规划
      • CPU规划
    • Producer开发
      • producer的主要参数
      • 多线程处理
    • Consumer开发
      • 消费者组
      • 位移
      • 主要参数
      • 位移管理
      • 重平衡rebalance
    • kafka设计原理
      • 集群管理
      • Producer端设计
      • Consumer端设计
      • 实现精确一次处理语义
    • 管理Kafka集群
      • 客户端API管理Topic与位移
      • Kafka安全

认识Apache Kafka

Kafka的核心功能

高性能的消息发送与高性能的消息消费

Kafka概要设计

吞吐量/延时

Kafka依靠以下4点达到了高吞吐量、低延时的设计目标:

  • 大量使用操作系统页缓存,内存操作速度快且命中率高
  • Kafka不直接参与物理IO操作,而是交由操作系统来完成
  • 采用追加写入方式,摒弃了缓慢的磁盘随机读写操作
  • 使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率

消息持久化

Kafka要持久化消息,这样做的好处是:

  • 解耦消息发送与消息消费
  • 实现灵活的消息处理:可以很方便地实现消息重演(处理已经处理过地消息)

负载均衡与故障转移

  • 负载均衡:Kafka的每台服务器都有均等的机会提供服务
  • 故障转移:服务器意外中止时,集群可以快速地检测到该失效,并将该服务器上地应用或服务转移到其他服务器上。

Kafka的故障转移的方式是使用会话机制。每台服务器启动后会以会话的形式把自己注册到ZooKeeper服务器上,一旦该服务器出现问题,与ZooKeeper的会话便会超时失效,此时Kafka集群会选举出另一台服务器来完全代替这台服务器继续提供服务。

伸缩性

每台Kafka服务器(Broker)的状态交由ZooKeeper保管,可以很轻松地对集群进行扩容

基本概念与术语

消息

Key:消息键,决定消息被保存在某个topic下的哪个partition(分区)

Value:消息体,保存实际的消息数据

TimeStamp:消息发送时间戳,用于流式处理及其他依赖时间的处理语义,不指定则取当前时间

topic与partition

topic代表了一类消息,它由多个partition组成,每个partition都有自己的***,通常从0开始。partition的引入没有太多业务含义,单纯地为了提高吞吐量

offset

topic partition下地每条消息都被分配一个位移值。通过topic、partition、offset即可找到唯一定位的那条消息。

replica

Kafka实现高可靠性的途径是依靠冗余机制,多个副本(replica)备份了多份日志。副本分为领导者(leader replica)和追随者(follower replica)。

leader和follower

  • follower不提供服务给客户端,仅仅被动地向leader获取数据。当leader所在的broker宕机时,Kafka会从剩余的replica中选举出新的leader
  • Kafka保证同一个分区的多个副本一定不会分配在同一台broker上。如果同一个broker上有同一个分区上的多个副本,将无法实现备份冗余效果

in-sync replica

  • 与leader replica保持同步的replica集合
  • Kafka动态地维护了一个replica集合,集合中的所有replica保存的日志消息都与leader replica保持同步

Kafka线上环境部署

磁盘容量规划

  • 假设每天产生1亿条消息,平均每条消息大小为1KB,每条消息保存两份并保留一周,那么每天产生的消息将占用:1亿 X 2 X 1KB /1024 / 1024 = 200GB的磁盘空间,最好再预留10%的磁盘空间,因为还要保存一周的数据,所以磁盘的容量规划是200 X 1.1 X 7 = 1.5TB。如果启用了消息压缩,则可以节约平均50%的容量,那么整体的磁盘容量就是0.75TB。
  • 所以磁盘容量规划主要与每天新增的消息数、消息留存时间、平均消息大小、副本数、消息留存时间、是否启用消息压缩有关

内存规划

  • 尽量分配更多的内存给操作系统的页缓存
  • 不要为broker设置过大的堆内存,最好不超过6GB
  • 页缓存大小至少要大于一个日志段的大小

CPU规划

追求多核而非高频率

Producer开发

producer的主要参数

bootstrap.servers

用于创建向Kafka broker的连接

key.serializer

为消息的key做序列化

value.serializer

为消息的value做序列化

acks

  • 用于控制producer生产消息的持久性
  • 指定了在给producer发送响应前,leader broker必须确保已成功写入该消息的副本数,当前acks有三个取值:0、1和all
  • 0表示producer完全不理睬leader broker端的处理结果,用户无法通过回调机制感知任何发送过程中的失败,吞吐量最高;all或者-1,leader broker不仅会将消息写入本地日志,同时还会等待ISR中所有其他副本都写入它们的本地日志后,才会发送响应结果给producer,所以消息肯定不会丢失,但是吞吐量最低;1,仅将该消息写入本地日志,便发送响应结果给producer,无需等待ISR中的其他消息写入该消息

buffer.memory

  • producer端用于缓存消息的缓冲区大小,单位是字节,默认是33554432,即32M
  • 几乎可以认为是producer程序使用的内存大小

compression.type

设置producer端是否压缩消息,默认为none,即不压缩

retries

处理写入请求失败时,重试的次数

batch.size

  • producer会将发送到同一分区的多条消息封装进一个batch中,当batch满了会发送batch中的所有消息
  • 默认参数为16384,即16KB
  • 再实际使用过程中可以合理增加该值,会提高producer的吞吐量

linger.ms

控制消息发送时的延时。默认值为0,表示消息需要被立即发送,无需关心batch是否已满

max.request.size

producer能够发送的最大请求的大小(可以理解为最大的消息大小),默认1048576字节

request.timeout.ms

当producer发送请求给broker后,broker需要在规定时间内将处理结果返还给producer,默认为30秒

多线程处理

多线程单KafkaProducer实例

  • 实现简单,性能好
  • 所有线程共享一个内存缓冲区,需要较多内存
  • 一旦producer某个线程崩溃导致KafkaProducer实例被破坏,则用户的所有线程都无法工作
  • 对于分区数不多的Kafka集群,推荐这种

多线程多KafkaProducer实例

  • 每个用户线程拥有专属的KafkaProducer实例,缓冲空间及对应的配置参数
  • 单个KafkaProducer崩溃不会影响其他producer线程
  • 需要较大的内存分配开销
  • 推荐拥有超多分区的集群使用

Consumer开发

消费者组

  • 消费者使用一个消费者组名(group.id)来标记自己,topic的每条消息只会被发送到每个订阅它的消费者组的一个消费者实例上
  • 所有的实例都属于相同的group时,Kafka实现了基于队列的消息引擎模型
  • 实例属于不同的group时,Kafka实现 了基于发布/订阅的消息引擎模型

位移

每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息

主要参数

bootstrap.servers

用于创建与Kafka broker服务器之间的连接

group.id

消费者组id,通常设置为一个有业务意义的名字

key.deserializer

消息的key反序列化

value.deserializer

消息的value反序列化

session.timeout.ms

消费者组检测组内成员发送崩溃的时间,默认为10秒

max.poll.interval.ms

consumer处理逻辑最大时间

auto.offset.reset

指定了无位移信息或位移越界时Kafka的应对策略,且只在这两种情况下,此参数才有效

可取值:earliest:从最早的位移开始消费,latest:从最新的位移开始消费,none:抛出异常

enable.auto.commit

是否自动提交位移

fetch.max.bytes

consumer端单次获取数据的最大字节数

max.poll.records

控制单次poll调用返回的最大消息数

connection.max.idle.ms

Kafka关闭空闲连接的时间,默认为9分钟

位移管理

每次提交的位移值是下一次要读的消息的位置

3种消息交付语义保证:

  • 最多一次:消息可能丢失,但是不会被重复处理
  • 最少一次:消息不会丢失,但是可能被处理多次
  • 精确一次:消息一定会被处理且只会被处理一次

很显然如果consumer在消息消费之前就提交位移,那么可以实现最多一次;如果在消息消费完成之后提交,则是最少一次

consumer位移管理

  • consumer会在所有的broker集群中选择一个broker作为group的coordinator,用于实现组成员管理、消费分配方案定制以及位移提交
  • 内部topic _consumer_offsets配置有多个分区和多个副本,用于保存consumer提交的位移
  • 消费者组首次启动时,coordinator根据参数auto.offset.reset为consumer指定开始读取的位移
  • 每个consumer运行一段时间后必须提交自己的位移值,如果consumer崩溃或被关闭,则它负责的分区会被分配给其他consumer
  • consumer提交位移的机制主要是向coordinator发送位移提交请求来实现的,每个位移提交请求都会往_consumer_offsets对应的分区上追加一条消息

自动提交

  • consumer默认是自动提交位移的,间隔为5秒,可通过auto.commit.interval.ms参数设置自动提交位移的间隔
  • 开发成本低,简单易用,但无法实现精确控制,位移提交失败后不易处理
  • 适用于对消息交付无需求,容忍一定的消息丢失的场景

手动提交

  • 需设置enable.auto.commit参数为false,然后调用commitSync或commitAsync方法
  • 可精确控制位移提交行为,开发成本增大
  • 适用于不允许消息丢失,需要“最少一次”处理语义的场景

重平衡rebalance

假设某个消费者组有20个consumer实例,该组订阅了100个分区的topic,正常情况下,Kafka会为每个consumer分配5个分区,这个分配的过程就是rebalance

触发条件

  • 组成员变更
  • 组订阅topic数发生变更
  • 组订阅topic的分区数发生变更

kafka设计原理

集群管理

成员管理

Kafka依赖ZooKeeper实现自动化服务发生与成员管理

broker向ZooKeeper中注册的信息以JSON格式保存,其中包括的信息如下:

  • listener_security_protocol_map:该broker与外界通信所用的安全协议类型
  • endpoints:指定了broker的服务endpoint列表
  • rack:broker机架信息
  • jmx_port:broker的JMX监控端口
  • host:broker主机名或IP地址
  • port:broker服务的端口号
  • timestamp:broker启动时间
  • version:broker当前版本号

ZooKeeper路径

/brokers:保存了Kafka集群的所有信息,包括每台broker的注册信息,集群上所有的topic的信息

/controller:保存了Kafka controller组件(负责集群的领导者选取)的注册信息,同时也负责controller的动态选举

/admin:保存管理脚本的输出结果

/isr_change_notification:保存ISR列表发生变化的分区列表

/config:保存Kafka集群下各种资源的定制化配置信息

/cluster:保存了Kafka集群的简要信息,包括ID信息和集群版本号

/controller_epoch:保存了controller组件的版本号,使用它来隔离无效的controller请求

通信协议

Kafka的通信协议是基于TCP之上的二进制协议

Producer端设计

ProducerRecord

封装了一条待发送的消息

  • topic:该消息所属的topic
  • partition:该消息所属的分区
  • key:消息的key
  • value:消息体
  • timestamp:消息时间戳

RecordMetadata

表示Kafka服务端返回给客户端的消息的元数据信息

  • offset:消息在分区日志中的位移信息
  • timestamp:消息时间戳
  • topic/partition:所属topic的分区
  • checksum:消息CRC32码
  • serializedKeySize:序列化后的消息key字节数
  • serializedValueSize:序列化后的消息value字节数

工作流程

Consumer端设计

消费者组状态机

consumer group的5个状态:

  • Empty:没有任何active consumer,但可能包含位移信息
  • PreparingRebalance:该group正准备进行rebalance
  • AwaitingSync:所有成员已经加入组并等待leader consumer发送分区分配方案
  • Stable:该group开始正常消费
  • Dead:group已经被废弃

实现精确一次处理语义

即消息被处理且只会被处理一次

幂等性

一个操作执行多次的结果与只运行一次的结果是相同的

要启用幂等性,需设置producer端的新参数enable.idempotence为true

事务

Kafka为实现事务要求应用程序必须提供一个唯一的id来表征事务

管理Kafka集群

客户端API管理Topic与位移

使用AdminClient和KafkaAdminClient管理集群

Kafka安全

目前Kafka包括的安全机制如下:

  • 连接认证机制,包含服务器端和客户端连接、服务器间连接以及服务器与工具间连接,认证机制包括SSL(TLS)或SASL
  • 服务器与ZooKeeper之间连接的认证机制
  • 基于SSL的连接通道数据传输加密
  • 客户端读/写授权
  • 支持可插拔的授权服务与外部授权服务的集成

本文来源 互联网收集,文章内容系作者个人观点,不代表 本站 对观点赞同或支持。如需转载,请注明文章来源,如您发现有涉嫌抄袭侵权的内容,请联系本站核实处理。

© 版权声明

相关文章