kafka基础知识
1 Kafka作用
- 解耦:模块之间的交互,假设生产数据模块有5个,消费数据模块有5个,那一共有25种交互可能,考虑情况很多,用mq解耦
- 异步:假设订单模块逻辑需要10ms,下单后需要更新库存模块(100ms),积分模块(100ms),物流模块(100ms),同步逻辑一共需要310ms。采用异步方式,下单后将通知写入mq(5ms),返回结果,一共需要15ms即可。至于库存、积分、物流模块将通过消费者的形式读取通知,进行更新操作
- 削峰:mysql一般2000请求/秒,如果用户在促销时间段大量请求,大概10000请求/秒,自然数据库承受不住压力,所以将用户请求到mq中,由mq承受请求压力,系统从mq中拉取请求,以2000请求/秒的速度拉取,保证数据库能够正常处理信息,其余请求积压在mq中。系统自然不会一直承受大量访问,高峰期一过,积压在mq中的请求也会慢慢被处理
2 Kafka缺点
- 可用性:如果系统只是A模块调用B模块的简单逻辑,自然不需要用mq,强上mq需要增加考虑mq使用不当挂掉的风险
- 复杂性:mq需要考虑的情况也很多,重复消费、消息丢失、消息是否要求顺序、积压太多等
- 一致性:例如异步中举的例子,订单模块返回成功,积分和物流模块正常消费消息更新,如果库存模块的mq没有正常处理,数据显然已经不一致,也是很严重的问题
3 kafka架构
- kafka消息持久化在磁盘
- broker:kafka节点(实例),一个broker就是一台kafka服务器
- producer:生产者,发送消息到broker
- consumer:消费者,从broker消费消息
- group:消费者组,由若干consumer组成,group之间互不影响,一个分区只能由一个组内的消费者消费
- topic:主题,逻辑上的概念,包含多个partition,可以分布到多个broker上
- partition:分区,一个有序的队列
- replica:副本,一个分区可以有多个副本,副本内是相同信息,包含一个leader和若干follower
- leader:主副本,生产者和消费者只与主副本交互
- follower:从副本,与leader数据保持同步,leader挂掉后,从follower中选举新leader并对外提供服务
了解kafka架构后,对副本replica还是不太清楚 👇
4 replicas管理
- AR:Assigned Replicas 所有副本
- ISR:In-Sync Replicas 副本同步队列
- OSR:Outof-Sync Replicas 副本未同步队列
- AR = ISR + OSR
- leader维护ISR中的follower,当ISR内follower同步滞后延迟超过一定阈值(延迟时间和延迟条数)时,就会将follower放入到OSR中,等进度追上了leader时,重新放入ISR,新加入的follower也会放入OSR
对于副本有了解后,那follower和leader又怎么工作呢 👇
5 follower如何与leader同步数据
- 既不是同步复制,也不是异步复制
- 同步复制的风险:等待响应时间加长,影响吞吐率
- 异步复制的风险:leader挂掉,如果还未完全复制,丢失数据
- 所以利用ISR伸缩性均衡数据不丢失以及吞吐率,内部批量写磁盘减少时间差,磁盘顺序读和Zero-copy机制提高复制性能
了解了leader和follower的工作,也应该知道这和kafka的高可用有关吧 👇
6 如何高可用
- 读数据:读数据时直接读取leader的数据即可,kafka会均匀的将一个partition的所有replica分布在不同的机器上,提高容错性
- 写数据:写数据时生产者将数据写到leader上,leader将数据放在磁盘上,follower主动pull数据。(其中一种模式是:一旦所有follower写入数据后,发送ack给leader,leader发送成功的消息给生产者);写数据时消费者只从leader处读取数据。(其中一种模式是:所有follower都返回ack时,消息才会被消费者拿到)
- 总之,没有副本机制是做不到高可用,高可用必然实现,其中某一台机器宕机后,数据不能丢
高可用从读写的角度分析,那么读写为什么不分离呢? 👇
7 为什么不支持读写分离
- kafka读写都是和leader交互,自然不是读写分离的
- 如果支持读写分离,主写从读的模式下,延时窗口导致数据不一致(在写入leader后,follower主动pull数据时,如果读取了follower的信息,显示是旧信息)
- 网络延时自然耗时,对于时间敏感应用更不适用
消费数据时,如何保证不会重复消费呢? 👇
8 如何保证不会重复消费(消息幂等性)
- 首先要确认重复消费带来的影响,比如消息要进数据库,redis,文件等
- 结合业务需求来考虑,基于不会重复的唯一键,或者利用redis的set天然幂等性
- 对于数据的增删改操作采用合理的逻辑处理方式
能保证不会重复消费,那如何保证不会少消费呢?👇
9 如何保证消息可靠传输(消息丢失)?
- 情况一:消费者拿到数据后还未完全处理完成,偏移量就提交了,自然就丢数
- 情况二:写数据到leader后,follower还未同步,leader直接宕机,重新选举leader,自然就丢数
- 情况三:broker存储数据,通过缓存写入磁盘,还未写入磁盘时宕机,数据直接丢失
对于情况一
- 可以改自动提交为手动提交
- 可以用zookeeper记录kafka分区消费情况,消费者从指定偏移量处seek
对于情况二
- topic设置replication.factor必须大于1,保证每个分区都至少有2个副本
- 生产者设置retries=MAX(无限次重试),保证一旦写入失败,卡在重试这里不动
- 生产者设置acks=-1或all,保证必须写入所有replica,才算是写入成功
- kafka服务器设置min.insync.replicas必须大于1,保证leader至少有一个follower保持联系
设置acks参数可以更换模式,那么都有什么模式呢?👇
10 acks参数设置
- acks用来指定分区中有多少个副本收到这条消息,生产者才认为这条消息是写入成功
- acks=1(默认),leader写入成功,就代表成功
- acks=0,发送消息即算写入成功,容易导致消息丢失
- acks=-1、acks=all,ISR中的所有副本都成功写入消息后生产者才认为是写入成功
如果消息内容是对数据库某行记录update,如何保证顺序性呢?👇
11 如何保证顺序性
- 情况一:一个topic下有多个partition,发送消息是分散在不同partition上的
- 情况二:如果1个topic下只有1个partition,那么自然保证此topic下的顺序性
- 对于情况二:如果必须要多个partition呢,可以用partition-key,相同的key发送到同一个partition上,key可以作为某种有强制顺序性的标识,比如id,流水号等
- 注意:到了消费者一端,如果是多线程模型消费,也无法保证消费顺序性,所以才去单线程模型,考虑到吞吐量,在单线程模型消费下,将消费到数据通过强制性标识比如id,流水号分发到不同的内存队列上,在这里创建多线程,由每个线程负责一个内存队列
如果考虑到顺序性,也采取了合适的模型,那么如果先序数据发送失败,后序数据发送成功呢?👇
12 生产者发送消息模式
- 发后忘记(fire-and-forget):不关心消息是否正确到达
- 同步(sync):调用get()方法,返回Future对象,同步等待消息是否发送成功
- 异步(async):有回调函数,当发送失败时调用此函数,不会阻塞其他消息发送
- 这里异步请求发送失败的消息,可以打上失败标记,存在失败队列中,单独用某个定时器重新发送
某个分区一边生产,一边消费,如何确定能消费到哪些呢?👇
13 如何确定拿到某条分区消息
分区相当于日志文件
日志起始偏移量(LogStartOffset)、即将写入消息偏移量(LogEndOffset,简称LEO)、可以消费区间(High Watermark,简称HW)
举例:LSO为0,LEO为100,HW为70,则代表该分区offset可以从0开始消费,能消费的区间是0-69,如果该分区接受消息,将放在offset是100的位置上
ISR中replicas中都维护自己的LEO,并且最小的LEO作为所有副本的HW(包括它自己)
HW更新过程
- 生产者将消息发送到leader
- leader将消息保存到日志中,更新自己的偏移量
- follower向leader请求同步,携带自己的LEO
- leader通过LEO得到自己的HW,并将消息和HW返回给follower
- follower拿到消息同步到日志中,并更新HW
leader epoch:其实这就是一个版本号,在 follower 同步请求中不仅仅传递自己的 LEO,还会带上当前的 LE,当 leader 变更一次,这个值就会增 1
partition-key可以保证局部有序性,那么partition还有什么策略呢?👇
14 分区策略有哪些
- 轮询RoundRobin:key为null时,依次将消息发送到topic的各个partition
- key:根据key进行hash,能保证局部有序性,但是如果partition的数量发生变化,很难保证key和分区的映射关系
- 自定义策略和指定partition发送
- Range:消费者的总数和分区总数进行整除运算来分配,按照主题粒度的,所以可能会不均匀
- Sticky:分区的分配尽可能均匀,以及分配要尽可能和上次保持一致
又了解到了分区再分配,那么分区再分配解决了什么问题呢?👇
15 分区再分配解决了什么问题
- 如果没有分区再分配,那会发生什么呢?
- 情况一:集群中broker下线了,如果节点上的分区是单副本的,那么这些分区就不可用了
- 情况二:集群中加了一个broker,那么新的分区会分配在这个broker上,旧的分区却不会分配在这个broker上,导致老节点和新节点之间负载不均衡
- 分区再分配就是通过控制器给分区增加新的副本,通过网络把旧副本上的数据复制到新副本上,复制完成后,再将旧副本清除(通过复制限流不影响集群正常功能)
提到了控制器对分区再分配的作用,那么控制器是什么呢?👇
16 控制器是什么
- 集群中的broker其中一个会被选举成控制器
- 负责管理集群中的所有分区和副本状态
- 控制器只能有一个
- 情况一:leader副本出现故障 -> 为其选举出新的leader副本
- 情况二:ISR集合发生变化 - > 通知所有broker更新其元数据信息
- 情况三:某个topic增加分区数量 -> 为其重新分配分区
17 日志文件
- kafka 采用了追加日志的格式将数据存储在磁盘上,追加日志的格式可以带来写性能的提升
- kafka 中的 log 文件对应着多个日志分段 LogSegment
- 日志删除(Log Retention)
- 按照一定策略直接删除日志分段
- 过期时间和日志大小。默认保留时间是 7 天,默认大小是 1GB
- 日志压缩(Log Compaction)
- 对每个消息的 key 进行整合,只保留同一个 key 下最新的 value
- 日志压缩会产生小文件,为了避免小文件过多,kafka 在清理的时候还会对其进行合并
- 日志索引--为了提高读的性能,需要在写的时候维护一个索引,有偏移量索引和日志索引
- 偏移量索引:维护一张映射表,key是消息的偏移量,value是日志文件的偏移量,Kafka维护的是稀疏索引,如果没找到对应的key,那么就先找到比offset小一点的key,再找到value,然后在日志中查找
- 时间戳索引:时间戳索引是一个二级索引,现根据时间戳找到偏移量,然后就可以使用偏移量索引找到对应的消息位置
- Kafka零拷贝:在内核态直接将文件内容复制到网卡设备上,减少了内核态与用户态之间的切换
18 生产者如何写入数据
在生产端主要有两个线程:main 和 sender,两者通过共享内存 RecordAccumulator 通信
main线程
- KafkaProducer:创建消息
- Partitioner:分区器计算该消息的目标分区,然后数据会存储在 RecordAccumulator 中
- Serializer:序列化器将消息转换成字节数组的形式
- ProducerInterceptors:生产者拦截器在消息发送之前做一些准备工作,比如过滤不符合要求的消息、修改消息的内容
sender线程
- 创建具体的请求,如果请求过多,会将部分请求缓存起来,将准备好的请求发送到 kafka 集群
- 每个请求的大小通过 max.reqeust.size 控制,默认 1MB
- 创建好请求后,根据发送到broker的不同重新分组,每个节点可以存储的请求由max.in.flight.requests.per.connection控制,默认5
RecordAccumulator消息累加器
- buffer.memory:消息累加器缓存大小,默认是32MB
- main线程发送消息太多导致缓存快满时,max.block.ms控制阻塞等待时间,默认60s
- ProducerBatch没装满时不会一直等待,linger.ms控制等待时间,默认0
- 消息被组装成ProducerBatch的形式,batch.size控制大小,默认是1MB,包含一个或多个消息
19 生产者参数
main线程
- acks:1 消息写入方式
- compression.type:none 消息压缩方式
- retries:0 重试次数
- retry.backoff.ms:100ms 重试间隔
- max.block.ms:60s 发送消息阻塞时间
RecordAccumulator消息累加器
- batch.size:1MB ProducerBatch大小
- linger.ms:0 ProducerBatch控制等待时间
sender线程
- metadata.max.age.ms:5分钟 元数据更新间隔时间
- max.in.flight.requests.per.connection:5 每个连接最大未响应请求数
- max.reqeust.size:1MB 每个请求大小限制
- connection.max.idle.ms:9分钟 连接空闲时间
- receive.buffer.bytes:32KB 接收缓存区大小
- send.buffer.bytes:32KB 发送缓存区大小
- request.timeout.ms:30s 等待请求响应最长时间
其他
- replica.lag.time.max.ms:follower滞后leader的最长时间间隔
- bootstrap.servers:连接集群的broker地址
20 消费者参数
- enable.auto.commit:是否自动提交
- auto.commit.interval.ms:自动提交的时间间隔
21 消费者再均衡
- 消费者之间的协调是通过消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的
- 有新的消费者加入,下线,主动退出
- 消费组对应的组协调器节点发生变化
- 订阅的主题或分区发生数量变化
- 再均衡会经过下面几个步骤:
- FindCoordinator:消费者查找组协调器所在的机器,然后建立连接
- JoinGroup:消费者向组协调器发起加入组的请求
- SyncGroup:组协调器将分区分配方案同步给所有的消费者
- Heartbeat:消费者进入正常状态,开始心跳
参考链接: