带有马拉松的 Mesos 集群上的 Kafka Docker 容器

带有马拉松的 Mesos 集群上的 Kafka Docker 容器

我正在尝试在 mesos 集群上的 docker 容器上部署 kafka 代理。

具体来说,我有一个 mesos 集群,我使用 marathon 作为初始化系统在其中部署了各种 docker 容器。所有容器都有服务端口,可通过代理 (HAproxy) 访问。

问题

当我使用 marathon 部署 kafka 容器时,我可以创建一个主题,列出所有主题,但无法运行 produce/consume 命令。produce 命令给出以下错误

[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

我使用的 docker 镜像是 spotify/kafka,它预装了 zookeeper 和 kafka。当我使用 docker run 命令运行该镜像时,它运行良好。

我正在使用以下 marathon json 文件来部署容器:

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "spotify/kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

cmd 导出一些设置内部主机 IP 和端口的环境变量。外部端口是随机的,会被 HAproxy 捕获并路由到静态端口。

我用来与 kafka 通信的命令来自文档:

https://kafka.apache.org/documentation.html#quickstart

我还使用了其他图像,例如 ches/kafka、wurstmeister/kafka 以及我自己创建的图像。我还发现https://github.com/mesos/kafka构建完成后,您可以向端口 7000 发送命令并将代理部署到集群,但对我来说失败了。理想情况下,我想要一个已经具有 zookeeper 和 kafka 的映像,就像 spotify 映像一样。

更新 1
因此我修改了 marathon JSON 文件并导出了一些似乎需要的变量。最终的 JSON 如下所示

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "192.168.1.235:5000/spotify-kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

当我尝试生成消息时,这个更改给了我不同的结果。

[2016-01-19 11:02:09,297] WARN Error while fetching metadata     [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

更新 2 - 解决方案
于是在网上搜索,我找到了这个存储库https://github.com/tobilg/docker-kafka-marathon/ 这个人创建了一个 shell 脚本,可以自动为您创建属性文件。您还可以扩展此容器并拥有多个 kafka 代理实例。对我来说唯一的缺点是它依赖于外部 zookeeper 服务器,但我认为在镜像中安装它不会解决这个问题。

因此我将其标记为已解决。

相关内容