由于偏移超出范围,Confluent Kafka Connect S3 重新处理数据

由于偏移超出范围,Confluent Kafka Connect S3 重新处理数据

我遇到了一种奇怪的情况。

我的 Confluent Kafka Connect S3 每秒处理大约 8K 条消息。发生了什么随机每隔几天是:

  1. Fetcher 类将尝试获取偏移量(例如 158473047)。
  2. 它会得到一个“获取偏移量 158473047 超出分区范围”,然后记录错误并将偏移量重置为
  3. 当我在__consumer_offsets,它就在那里。因此,我的偏移量并未从 __consumer_offsets 主题中删除。
  4. 因此,我的 Kafka 连接器重新处理了几亿条消息。

Kafka Connect S3 日志:

2020-05-11 05:18:41,372] INFO WorkerSinkTask{id=datalake-m3-to-s3-TEST-bulk-0} Committing offsets asynchronously using sequence number 16703: {M3.TEST-BULK-0=OffsetAndMetadata{offset=158473001, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:345)

[2020-05-11 05:18:42,020] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=2017/month=05/day=15/hour=18/M3.TEST-BULK+0+0158473001.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)

[2020-05-11 05:18:42,033] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Fetch offset 158473047 is out of range for partition M3.TEST-BULK-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1256)

[2020-05-11 05:18:42,035] INFO [Consumer clientId=connector-consumer-datalake-m3-to-s3-TEST-bulk-0, groupId=connect-datalake-m3-to-s3-TEST-bulk] Resetting offset for partition M3.TEST-BULK-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:385)

[2020-05-11 05:18:42,174] INFO Opening record writer for: M3BULK/M3.TEST-BULK/year=1970/month=01/day=01/hour=00/M3.TEST-BULK+0+0000000000.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider:69)

**集群组件:**

Kafka Brokerserver.properties内容:

auto.create.topics.enable=true
broker.id=1
compression.type=producer
confluent.support.metrics.enable=false
controlled.shutdown.enable=true
delete.topic.enable=true
group.initial.rebalance.delay.ms=3000
default.replication.factor=2
group.max.session.timeout.ms=900000
log.retention.check.interval.ms=60000
log.retention.hours=24
log.segment.bytes=67108864
log.segment.delete.delay.ms=1000
num.io.threads=8
num.network.threads=6
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.topic.replication.factor=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka-data
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://KAFKA01.CONFLUENT.LOCAL:9092
zookeeper.connect=KAFKA01.CONFLUENT.LOCAL:2181,KAFKA02.CONFLUENT.LOCAL:2181,KAFKA03.CONFLUENT.LOCAL:2181

重点:

  1. 集群并非空闲。
  2. 服务器时间正确。
  3. 偏移量.保留.分钟属性设置为其默认值,即 10080 分钟。

我非常感谢您就此事提供的帮助。谢谢!

相关内容