大数据聚合统计spark(大数据之Spark词频统计)
大数据聚合统计spark(大数据之Spark词频统计)#新建一个包含了一些语句的文本文件,网上摘取一段文字,保存,退出cd wordcountcd demo_code#新建一个词频统计文件保存目录mkdir wordcount
准备工作#进入spark目录
cd /usr/local/spark
#新建代码实验目录
mkdir Demo_code
cd demo_code
#新建一个词频统计文件保存目录
mkdir wordcount
cd wordcount
#新建一个包含了一些语句的文本文件,网上摘取一段文字,保存,退出
vim demoWord.txt
spark-shell运行词频统计#进入spark目录
cd /usr/local/spark
#启动spark
./bin/spark-shell
#在scala中加载文本文件
val wordFile = sc.textFile("file:///usr/local/spark/demo_code/wordcount/demo_word.txt")
#查看内容
wordFile.first()
#计算词频统计方法
val wordCount = wordFile.flatMap(line => line.split(" ")).map(word => (word 1)).reduceByKey((a b) => a b)
#查看词频统计结果
wordCount.collect()
spark-shell运行词频统计
Scala独立程序实现词频统计#进入词频统计文件目录
cd /usr/local/spark/demo_code/wordcount/
#在词频统计文件目录下,新建scala程序目录
mkdir -p src/main/scala
#进入scala程序目录
cd /usr/local/spark/demo_code/wordcount/src/main/scala
#新建scala程序文件,编写程序,保存,退出
vim demo_word.scala
程序代码如下所示:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object WordCount {
def main(args: Array[String]) {
val inputFile = "file:///usr/local/spark/demo_code/wordcount/demo_word.txt"
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val wordFile = sc.textFile(inputFile)
val wordCount = wordFile.flatMap(line => line.split(" ")).map(word => (word 1)).reduceByKey((a b) => a b)
wordCount.foreach(println)
}
}
sbt编译打包
#进入词频统计文件目录
cd /usr/local/spark/demo_code/wordcount/
#新建sbt文件用于编译程序,保存,退出
vim demo_word.sbt
编辑内容如下:
name := "demoWord Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies = "org.apache.spark" %% "spark-core" % "2.1.0"
#编译程序
/usr/local/sbt/sbt package
编译成功
#运行程序
/usr/local/spark/bin/spark-submit --class "WordCount" /usr/local/spark/demo_code/wordcount/target/scala-2.11/demoword-project_2.11-1.0.jar
统计结果
Spark词频统计Demo介绍到这些