替换 Apache Kafka 中的键值

替换 Apache Kafka 中的键值

我希望 Kafka 只存储主题中给定键的最新值。这似乎是可能的,如下所示这里

但是,到目前为止,我唯一想到的是,所有键/值对都被删除或什么都没有。我在 Java 程序中用上面链接的代码示例替换给定的键/值对。这导致主题中出现两次相同的键,一次是值,一次是 NULL。

这是我的 server.properties

listeners=kafka0://SERVERIP:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
#168
log.retention.minutes=1
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
log.retention.bytes=200000

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
log.segment.bytes=200000

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=2000

log.cleaner.enable=true
#Ensure compaction runs continuously
log.cleaner.min.cleanable.ratio = 0.00001
#Set a limit on compaction so there is bandwidth for regular activities
log.cleaner.io.max.bytes.per.second=1000000
#
log.roll.ms=60000

############################# Group Coordinator Settings #############################

delete.topic.enable = true
auto.create.topics.enable = false

以下是主题的设置: 主题设置

相关内容