学spark必须学scala吗(二Spark入门程序WordCount详解)
学spark必须学scala吗(二Spark入门程序WordCount详解)
Scala版本WordCount- 项目目录结构如下:
- 在项目目录data下创建要统计词频的文件words.txt
- 新建Scala版的WordCount程序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
/**
* SparkConf创建Spark应用程序的配置
*
* setAppName:用来设置应用程序的名称 如果些程序运行在Spark的集群上,可以在资源管理器的UI上看到,如standalone yarn mesos
*
* setMaster:用来设置应用程序的本地运行模式
* 1) local: Spark应用程序本地运行模式,如果local后不不指定参数,就默认使用一个core来运行Spark应用程序
* local[n]: 指定使用n个cores来运行Spark应用程序
* local[*]: 使用本地机器所有的cores来运行Spark应用程序
* 这里的core是对应计算机线程,假如你的计算机是4核8线程,那么你的计算机总共可以提供8个cores来运行Spark应用程序
* 如果不设置setMaster 将来程序要打包发布到集群上了运行,集群主要有standalone yarn mesos
* 2) standalone: Spark自带的资源管理器,主节点叫Master 从节点叫Worker
* 3) yarn: 基于Hadoop yarn资源管理器运行Spark程序,主节点叫ResourceManager 从节点叫NodeManager
* 4) mesos: 一般国外用的比较多,这里就不详细讲解
*
*/
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
/**
* SparkContext: 它是Spark应用程序通往集群的唯一通道 创建了SparkContext
* SparkContext底层默认会创建两个对象DAGScheduler和TaskScheduler
* DAGScheduler: 依赖Spark应用程序中的RDD的宽窄依赖切割Job 划分Stage 每个Stage又会封装成TaskSet 提交给TaskScheduler
*
* TaskScheduler: 负责从TaskSet中遍历一个个的并行的Task 发送给工作节点中的Executor中的ThreadPool执行 监控Task执行,回收结果。
*/
val sc = new SparkContext(conf)
/**
* 设置Spark应用程序的运行日志级别
*/
sc.setLogLevel("WARN")
/**
* 通过调用Hadoop底层的方法来切分文件,创建RDD
*/
val lines: RDD[String] = sc.textFile("./data/words.txt")
/**
* flatMap: 功能是将RDD中的partition中的一行行的数据依照空格切分压平
* 它是一个lazy的Transformation算子,懒执行,需要由action算子触发执行
* flatMap算子是1对多,进去一行行的文本,出来一个个的单词
* flatMap内部的逻辑是在Executor中执行
*/
val words: RDD[String] = lines.flatMap(line => line.split(" "))
/**
* map: 功能是将上步生成的RDD的每个元素创建成K V格式的RDD
* map也是懒执行算子,需要有action类算子触发执行
* map的操作是1对1,对于每一个元素,调用一次方法
* 对每一个单词计数1,形成单词与1的键值对,如(金庸 1) (金庸 1) (天龙八步 1)...
*/
val pairs: RDD[(String Int)] = words.map(word => (word 1))
/**
* reduceByKey: 对上步RDD的partition中的K V格式的RDD先分组再聚合 是懒执行算子
*
* reduceByKey是一个shuffle类的算子,如果数据分布在多台节点上,会进行局部分组聚合,再全局聚合 对于跨节点的数据会落地磁盘小文件
* 基处理也要经过mapper和reducer两个阶段
*
* 假如数据有两个分区,分别在node1节点和node2节点上,那么:
*
* ===================>mapper=========================>shuffle=================>reducer=========================
* node1节点:
* (金庸 1)
* (金庸 1)
* (金庸 1)
* (金庸 1) 局部分组聚合=> (金庸 4) => 写入磁盘落地 (金庸 6)
* (天龙八部 1) (天龙八部 4)
* (天龙八部 1)
* (天龙八部 1)
* (天龙八部 1) => 全局聚合
* node2节点: (天龙八部 6)
* (金庸 1) (段誉 2)
* (金庸 1)
* (天龙八部 1) 局部分组聚合=> (金庸 2) => 写入磁盘落地
* (天龙八部 1) (天龙八部 2)
* (段誉 1) (段誉 2)
* (段誉 1)
*
*/
val reduced: RDD[(String Int)] = pairs.reduceByKey((v1 v2) => v1 v2)
/**
* sortBy: 用来对RDD中的数据进行排序,是懒执行算子,它需要两个参数
* 1)参数1是一个f 传递排序的规则
* 2) 参数2是排序的方式,默认是true 表示升序, 如果需要降序,需要设置成false
*/
val sorted: RDD[(String Int)] = reduced.sortBy(tup => tup._2 ascending = false)
/**
* foreach: 是一个action算子,触发执行
*
* Spark应用程序中有几个action算子就有几个job
*/
sorted.foreach(tup=>println(tup))
//退出,释放资源
sc.stop()
}
}
输出结果如下:
(金庸 25)
(天龙八部 13)
(段誉 10)
(神雕侠女 6)
(射雕英雄传 6)
(郭靖 5)
(黄蓉 5)
(杨过 4)
(乔峰 4)
(小龙女 2)
(段正淳 2)
(阿朱 2)
(阿紫 2)
(木婉清 1)
(王语嫣 1)
(黄药师 1)
(晓蕾 1)
(虚竹 1)
(段王爷 1)
(马夫人 1)
(钟灵 1)
(欧阳锋 1)
(老顽童 1)
(洪七公 1)
(梅超风 1)
(尹士平 1)
(段延庆 1)