问题

问题

问题

  1. Spark 的分区数量是如何决定的?
  2. 我是否需要在某处明确指定可用的 CPU 核心数,以便分区数相同(例如 parallelize 方法的 numPartition 参数,但每当核心数发生变化时就需要更新程序)?

背景

安装 Spark 集群环境对 spark-env.sh、spark-defaults.conf 文件以及程序中的 SparkConf 对象没有任何改变。

对于 N Queen 程序,分区数为 2,并且只为一个节点分配任务。对于字数统计程序,分区数为 22,并且将任务分配给所有节点。两个程序都使用了 spark-submit。

程式

皇后区

val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }

val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))

val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()

字数

val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))

val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)

环境

Ubuntu 14.04 上的 Spark 2.0.1(3 个节点,每个节点有 4 个 CPU)。
独立部署(不是 YARN 也不是 Mesos)

答案1

找到以下信息如何调整你的 Apache Spark 作业(第 2 部分)

这个数字是如何确定的?上一篇文章描述了 Spark 将 RDD 分组为阶段的方式。(快速提醒一下,像 repartition 和 reduceByKey 这样的转换会导致阶段边界。)阶段中的任务数与阶段中最后一个 RDD 中的分区数相同。RDD 中的分区数与其所依赖的 RDD 中的分区数相同,但有几个例外:coalesce 转换允许创建一个分区数少于其父 RDD 的 RDD,union 转换创建一个分区数为其父级分区数之和的 RDD,cartesian 创建一个分区数为其乘积的 RDD。

没有父节点的 RDD 怎么办?textFile 或 hadoopFile 生成的 RDD 的分区由所使用的底层 MapReduce 输入格式决定。通常,每个正在读取的 HDFS 块都会有一个分区。parallelize 生成的 RDD 的分区来自用户提供的参数,如果没有提供,则来自 spark.default.parallelism。

spark.default.parallelism 选项修复了该症状。

--conf spark.default.parallelism=24

设置为 12(与核心数相同)会导致节点使用不均匀。

相关内容