快捷搜索:  汽车  科技

sparksql代码审核(些许风寒-Spark三种方式实现WordCount)

sparksql代码审核(些许风寒-Spark三种方式实现WordCount)

想起落雪的江南 一代精英们衣衫正单 围炉煮酒,些许风寒 美人随意一笑 顿时使文学成为遗产 —北大诗人 林东威编码流程模板

sparksql代码审核(些许风寒-Spark三种方式实现WordCount)(1)

RDD方式

RDD (Resilient Distributed Dataset) is a fundamental data structure of Spark and it is the primary data abstraction in apache Spark and the Spark Core. RDDs are fault-tolerant immutable distributed collections of objects which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions which can be computed on different nodes of the cluster

package com.example import org.apache.Spark.rdd.RDD import org.apache.spark.SQL.sparkSession /** * rdd版本wordcount */ object WordCount_RDD { def main(args: Array[String]): Unit = { // 初始化环境 val spark = SparkSession.builder() .appName("WordCount-RDD") .master("local[*]") //本地开发模式 .getOrCreate() val sc = spark.sparkContext // 读取数据 val lineRDD:RDD[String] = sc.textFile("d:/input/words.txt") // 数据转换 val wordRDD:RDD[String] = lineRDD.flatMap(_.split(" ")) val wordAndCountRDD:RDD[(String Int)] = wordRDD.map((_ 1)) val resultRDD:RDD[(String Int)] = wordAndCountRDD.reduceByKey(_ _) // 输出结果 resultRDD.saveAsTextFile("d:/output/out1") // 释放资源 sc.stop() spark.stop() } } DataFrame方式

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files tables in Hive external databases or existing RDDs.

package com.example import org.apache.spark.sql.{SaveMode SparkSession} /** * DataFrame版本wordcount */ object WordCount_DataFrame { def main(args: Array[String]): Unit = { // 初始化环境 val spark = SparkSession.builder() .appName("WordCount-DataFrame") .master("local[*]") //本地开发模式 .getOrCreate() import spark.implicits._ // 读取数据 val lineDF = spark.read.textFile("d:/input/words.txt") // 数据转换 val wordDF = lineDF.flatMap(_.split(" ")).toDF("word") val resultDF = wordDF.groupBy("word").count() // 输出结果 resultDF.write.mode(SaveMode.Overwrite).csv("d:/output/out2") // 释放资源 spark.stop() } } SQL方式

Spark SQL brings native support for SQL to Spark and streamlines the process of querying data stored both in RDDs (Spark’s distributed datasets) and in external sources. Spark SQL conveniently blurs the lines between RDDs and relational tables.

package com.example import org.apache.spark.sql.{SaveMode SparkSession} /** * SQL版本wordcount */ object WordCount_SQL { def main(args: Array[String]): Unit = { // 初始化环境 val spark = SparkSession.builder() .appName("WordCount-SQL") .master("local[*]") //本地开发模式 .getOrCreate() import spark.implicits._ // 读取数据 val lineDF = spark.read.textFile("d:/input/words.txt") // 数据转换 val wordDF = lineDF.flatMap(_.split(" ")).toDF("word") // 注册临时表 wordDF.createOrReplaceTempView("t_word") // 查询SQL val resultDF = spark.sql("SELECT word count(*) AS count FROM t_word GROUP BY word") // 输出结果 resultDF.write.mode(SaveMode.Overwrite).csv("d:/output/out3") // 释放资源 spark.stop() } }

猜您喜欢: