Kafka 协调器加载时间长且 ISR 较小

Kafka 协调器加载时间长且 ISR 较小

我正在使用 Kafka 0.8.2.1,运行一个有 200 个分区且 RF=3 的主题,日志保留设置为大约 1GB。

未知事件导致集群进入“协调器加载”或“组加载”状态。一些信号表明了这一点:基于 pykafka 的消费者在FetchOffsetRequests 期间开始出现故障,部分分区子集的错误代码为 14。COORDINATOR_LOAD_IN_PROGRESS这些错误是在使用在协调器加载之前就已存在的消费者组进行消费时触发的。在代理日志中,出现了如下消息:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

出于某种原因,Kafka 决定将副本 11 视为“首选”副本,尽管它不在 ISR 中。据我所知,在 11 重新同步时,副本 12 或 13 的消费可以不间断地继续进行 - 不清楚为什么 Kafka 选择未同步的副本作为首选领导者。

上述行为持续了大约 6 个小时,在此期间,pykafka fetch_offsets 错误导致无法消费消息。在协调器加载仍在进行中时,其他消费者组能够无错误地消费该主题。事实上,最终的修复方法是使用新的 consumer_group 名称重新启动损坏的消费者。

问题

  1. 协调器负载状态持续 6 小时是正常的还是预期的?此负载时间是否受日志保留设置、消息生成速率或其他参数的影响?
  2. 非 pykafka 客户端是否COORDINATOR_LOAD_IN_PROGRESS仅通过从无错误分区进行消费来处理?Pykafka 坚持要求所有分区都返回成功,OffsetFetchResponse这可能是消费停机的原因。
  3. 为什么 Kafka 有时会在协调器加载期间选择未同步的副本作为首选副本?如何将分区领导者重新分配给 ISR 中的副本?
  4. 因为我应该使用较新版本的 Kafka,所以所有这些问题都变得毫无意义了吗?

经纪人配置选项:

broker.id=10
port=9092
zookeeper.connect=****/kafka5

log.dirs=*****
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
message.max.bytes=1000000
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=60000
log.flush.interval.messages=60000
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

答案1

我没有使用过确切的 Kafka 版本,但我会尝试回答以下问题:

  1. 你可能启用了不干净的领导者选举,这取决于分区数量与消费者数量
  2. 可以,但通常情况下,在大多数 MQ 系统中,信息完整性比正常运行时间更重要,而 Kafka 是最无忧的一个
  3. 将不干净的领导者选举设置为 false
  4. 我不知道,有些概念保持不变。

相关内容