我正在尝试在 mesos 集群上的 docker 容器上部署 kafka 代理。
具体来说,我有一个 mesos 集群,我使用 marathon 作为初始化系统在其中部署了各种 docker 容器。所有容器都有服务端口,可通过代理 (HAproxy) 访问。
问题
当我使用 marathon 部署 kafka 容器时,我可以创建一个主题,列出所有主题,但无法运行 produce/consume 命令。produce 命令给出以下错误
[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
我使用的 docker 镜像是 spotify/kafka,它预装了 zookeeper 和 kafka。当我使用 docker run 命令运行该镜像时,它运行良好。
我正在使用以下 marathon json 文件来部署容器:
{
"id": "spotify-kafka.marathon",
"cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n",
"container": {
"type": "DOCKER",
"docker": {
"image": "spotify/kafka",
"network": "BRIDGE",
"portMappings": [
{"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
{"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
]
}
},
"cpus": 0.5,
"mem": 1024.0,
"instances": 1
}
cmd 导出一些设置内部主机 IP 和端口的环境变量。外部端口是随机的,会被 HAproxy 捕获并路由到静态端口。
我用来与 kafka 通信的命令来自文档:
https://kafka.apache.org/documentation.html#quickstart
我还使用了其他图像,例如 ches/kafka、wurstmeister/kafka 以及我自己创建的图像。我还发现https://github.com/mesos/kafka构建完成后,您可以向端口 7000 发送命令并将代理部署到集群,但对我来说失败了。理想情况下,我想要一个已经具有 zookeeper 和 kafka 的映像,就像 spotify 映像一样。
更新 1
因此我修改了 marathon JSON 文件并导出了一些似乎需要的变量。最终的 JSON 如下所示
{
"id": "spotify-kafka.marathon",
"cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n",
"container": {
"type": "DOCKER",
"docker": {
"image": "192.168.1.235:5000/spotify-kafka",
"network": "BRIDGE",
"portMappings": [
{"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
{"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
]
}
},
"cpus": 0.5,
"mem": 1024.0,
"instances": 1
}
当我尝试生成消息时,这个更改给了我不同的结果。
[2016-01-19 11:02:09,297] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
更新 2 - 解决方案
于是在网上搜索,我找到了这个存储库https://github.com/tobilg/docker-kafka-marathon/
这个人创建了一个 shell 脚本,可以自动为您创建属性文件。您还可以扩展此容器并拥有多个 kafka 代理实例。对我来说唯一的缺点是它依赖于外部 zookeeper 服务器,但我认为在镜像中安装它不会解决这个问题。
因此我将其标记为已解决。