快捷搜索:  汽车  科技

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)./spark-submit --master spark://node01:7077 --executor-cores 1 --executor-memory 2g --total-executor-cores 3 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000任务调度源码分析Action算子开始分析任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。注意:一个进程不能让集群多个节点共同启动。./spark-submit --master spark://node01:7077 --executor-cores 1 --class org.apache.spark.examples.SparkPi

资源调度源码分析

资源请求简单图

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(1)

资源调度Master路径:

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(2)

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala

提交应用程序,submit的路径:

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(3)

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala

总结:

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor 这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores 一个Application会使用Spark集群中所有的cores。

结论演示

使用Spark-submit提交任务演示。也可以使用spark-shell

  1. 默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。

./spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

运行结果

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(4)

2.在workr上启动多个Executor 设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit --master spark://node01:7077 --executor-cores 1 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

运行结果

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(5)

3.内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit --master spark://node01:7077 --executor-cores 1 --executor-memory 3g --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

4.--total-executor-cores集群中共使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit --master spark://node01:7077 --executor-cores 1 --executor-memory 2g --total-executor-cores 3 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

任务调度源码分析

Action算子开始分析

任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。

划分stage 以taskSet形式提交任务

DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(6)

二次排序

在项目中添加一个SecondSort.txt文件

排序前文件中内容

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(7)

编写代码

package com.gw.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions object SparkSecondSort { def main(args: Array[String]): Unit = { val sconf=new SparkConf().setAppName("SecondSort").setMaster("local") val sc=new SparkContext(sconf) val lines=sc.textFile("secondSort.txt") val pairs= lines.map { x=>(new SecondSortKey(x.split(" ")(0).toInt x.split(" ")(1).toInt) x) } val sortedPairs= pairs.sortByKey(false) // val sortedPairs = pairs.sortBy(_._1 false) sortedPairs.map(_._2).foreach {println } sc.stop() } } class SecondSortKey(val first:Int val second:Int) extends Ordered[SecondSortKey] with Serializable { def compare(that: SecondSortKey): Int = { if(this.first-that.first==0) this.second- that.second else this.first-that.first } }

运行效果

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(8)

topN和分组取topN

topN

需求:获取成绩单中,成绩排在前五的学生信息

在项目中添加一个top.txt文件,在文件中以K V(K表示成绩,V表示姓名)对的形式添加数据,如下图:

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(9)

编写代码

package com.gw.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext object SparkTopN { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TopN").setMaster("local") val sc = new SparkContext(conf) val lines=sc.textFile("top.txt") val lineList=lines.map(x=>(x.split(" ")(0) x)) val sortRdd = lineList.sortByKey(false) val resultRDD = sortRdd.map(x=>x._2) for(a <-resultRDD.take(5)){ println(a) } sc.stop() } }

运行结果

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(10)

分组取topN

需求:给每个班级的学生成绩排序

在项目中添加一个文件scores.txt,在文件中编写K V(K表示班级,V表示成绩)格式的数据,如下图

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(11)

编写代码

package com.gw.scala import org.apache.spark.{SparkContext SparkConf} object SparkGroupTopN { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("GroupTopN").setMaster("local") val sc = new SparkContext(conf) val lines=sc.textFile("scores.txt") val lineList=lines.map(x=>(x.split("\t")(0) x.split("\t")(1))).groupByKey() val topList=lineList.map(x=>{ var t = List[Int]() for(a<-x._2){ t = t.::(a.toInt) } println(x._1) t.sortBy { x => -x }.take(3) }) topList.foreach { println } } }

运行结果

spark单机处理数据量(疯狂Spark之Spark资源调度和任务调度)(12)

猜您喜欢: