我遇到了一种奇怪的情况。
我的 Confluent Kafka Connect S3 每秒处理大约 8K 条消息。发生了什么随机每隔几天是:
- Fetcher 类将尝试获取偏移量(例如 158473047)。
- 它会得到一个“获取偏移量 158473047 超出分区范围”,然后记录错误并将偏移量重置为 零。
- 当我在__consumer_offsets,它就在那里。因此,我的偏移量并未从 __consumer_offsets 主题中删除。
- 因此,我的 Kafka 连接器重新处理了几亿条消息。
Kafka Connect S3 日志:
2020-05-11 05:18:41,372] INFO WorkerSinkTask{id=datalake-m3-to-s3-TEST-bulk-0} Committing offsets asynchronously using sequence number 16703: {M3.TEST-BULK-0=OffsetAndMetadata{offset=158473001, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:345)
[2020-05-11 05:18:42,020] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=2017/month=05/day=15/hour=18/M3.TEST-BULK+0+0158473001.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)
[2020-05-11 05:18:42,033] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Fetch offset 158473047 is out of range for partition M3.TEST-BULK-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1256)
[2020-05-11 05:18:42,035] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Resetting offset for partition M3.TEST-BULK-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:385)
[2020-05-11 05:18:42,174] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=1970/month=01/day=01/hour=00/M3.TEST-BULK+0+0000000000.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)
**集群组件:**
- Kafka 构建:confluent-community-2.12
- 安装指南: https://docs.confluent.io/5.4.2/installation/installing_cp/rhel-centos.html
- Kafka 节点数:3(CentOS 7、4 核、16 GB RAM、250 GB SSD)
Kafka Brokerserver.properties
内容:
auto.create.topics.enable=true
broker.id=1
compression.type=producer
confluent.support.metrics.enable=false
controlled.shutdown.enable=true
delete.topic.enable=true
group.initial.rebalance.delay.ms=3000
default.replication.factor=2
group.max.session.timeout.ms=900000
log.retention.check.interval.ms=60000
log.retention.hours=24
log.segment.bytes=67108864
log.segment.delete.delay.ms=1000
num.io.threads=8
num.network.threads=6
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.topic.replication.factor=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka-data
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://KAFKA01.CONFLUENT.LOCAL:9092
zookeeper.connect=KAFKA01.CONFLUENT.LOCAL:2181,KAFKA02.CONFLUENT.LOCAL:2181,KAFKA03.CONFLUENT.LOCAL:2181
重点:
- 集群并非空闲。
- 服务器时间正确。
- 这偏移量.保留.分钟属性设置为其默认值,即 10080 分钟。
我非常感谢您就此事提供的帮助。谢谢!