Apache Spark 上的 HDFS 性能

Apache Spark 上的 HDFS 性能

我遇到了几个与 HDFS 相关的问题,这些问题可能有不同的根源。我尽可能多地发布信息,希望至少能就其中一些问题征求您的意见。基本上情况如下:

  • 未找到 HDFS 类
  • 与某些数据节点的连接似乎很慢/意外关闭。
  • 执行器丢失(并且由于内存不足错误而无法重新启动)

我正在寻找:

- HDFS 配置错误/调整建议

- 全局设置缺陷(例如,虚拟机和 NUMA 不匹配的影响)

- 对于最后一类问题,我想知道为什么当执行器死亡时,JVM 的内存没有被释放,从而不允许启动新的执行器。

我的设置如下:

1 个虚拟机管理程序,具有 32 个内核和 50 GB RAM,此虚拟机中运行 5 个虚拟机。每个虚拟机有 5 个内核和 7GB。每个节点有 1 个工作程序设置,具有 4 个内核和 6 GB 可用空间(其余资源将由 hdfs/os 使用

我在 spark 1.4.0 / hdfs 2.5.2 设置上运行了数据集为 4GB 的 Wordcount 工作负载。我从官方网站获取了二进制文件(无需本地编译)。

如果我可以提供其他相关信息,请告诉我。

(1)和 2)记录在 worker 的 work/app-id/exec-id/stderr 文件中)

1)Hadoop类相关问题

15:34:32: DEBUG HadoopRDD: SplitLocationInfo and other new Hadoop classes are unavailable. Using the older Hadoop location info code.
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo


15:40:46: DEBUG SparkHadoopUtil: Couldn't find method for retrieving thread-level FileSystem input data
java.lang.NoSuchMethodException: org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()

2)HDFS性能相关问题

出现如下错误:

 15:43:16: ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=284992323013, chunkIndex=2}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-b17f3299-99f3-4147-929f-1f236c812d0e/executor-d4ceae23-b9d9-4562-91c2-2855baeb8664/blockmgr-10da9c53-c20a-45f7-a430-2e36d799c7e1/2f/shuffle_0_14_0.data, offset=15464702, length=998530}} to /192.168.122.168:59299; closing connection
java.io.IOException: Broken pipe

15:43:16 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=284992323013, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-b17f3299-99f3-4147-929f-1f236c812d0e/executor-d4ceae23-b9d9-4562-91c2-2855baeb8664/blockmgr-10da9c53-c20a-45f7-a430-2e36d799c7e1/31/shuffle_0_12_0.data, offset=15238441, length=980944}} to /192.168.122.168:59299; closing connection
java.io.IOException: Broken pipe


15:44:28 : WARN TransportChannelHandler: Exception in connection from /192.168.122.15:50995
java.io.IOException: Connection reset by peer (note that it's on another executor)

一段时间之后:

15:44:52 DEBUG DFSClient: DFSClient seqno: -2 status: SUCCESS status: ERROR downstreamAckTimeNanos: 0
15:44:52 WARN DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-845049430-155.99.144.31-1435598542277:blk_1073742427_1758
java.io.IOException: Bad response ERROR for block BP-845049430-155.99.144.31-1435598542277:blk_1073742427_1758 from datanode x.x.x.x:50010
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:819)

以下两个错误多次出现:

15:51:05 ERROR Executor: Exception in task 19.0 in stage 1.0 (TID 51)
java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1528)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:81)
        at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:102)
        at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:95)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1110)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


15:51:19 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message AssociationError [akka.tcp://[email protected]:38277] -> [akka.tcp://[email protected]:34732]: Error [Invalid address: akka.tcp://[email protected]:34732] [
akka.remote.InvalidAssociation: Invalid address: akka.tcp://[email protected]:34732
Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: /x.x.x.x:34732
] from Actor[akka://sparkExecutor/deadLetters]

在数据节点的日志中:

ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: localhost.localdomain:50010:DataXceiver error processing WRITE_BLOCK operation  src: /192.168.122.15:56468 dst: /192.168.122.229:50010
java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.122.229:50010 remote=/192.168.122.15:56468]

我还发现以下警告:

2015-07-13 15:46:57,927 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:718ms (threshold=300ms)
2015-07-13 15:46:59,933 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 1298ms (threshold=300ms)

3)执行者损失

在作业早期,主服务器的日志显示以下消息:

15/07/13 13:46:50 INFO Master: Removing executor app-20150713133347-0000/5 because it is EXITED
15/07/13 13:46:50 INFO Master: Launching executor app-20150713133347-0000/9 on worker worker-20150713153302-192.168.122.229-59013
15/07/13 13:46:50 DEBUG Master: [actor] handled message (2.247517 ms) ExecutorStateChanged(app-20150713133347-0000,5,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://[email protected]:59013/user/Worker#-83763597]

这个过程直到作业完成或者失败才会停止(取决于实际失败的执行器数量)。

这里是每次尝试启动执行器时可用的 Java 日志(在工作程序上的 work/app-id/exec-id 中): http://pastebin.com/B4FbXvHR

相关内容