柚子快報激活碼778899分享:大數(shù)據(jù) Spark學(xué)習(xí)
柚子快報激活碼778899分享:大數(shù)據(jù) Spark學(xué)習(xí)
Spark學(xué)習(xí)
什么是spark?
Apache Spark是一個開源的集群計算系統(tǒng),旨在使數(shù)據(jù)分析變得快速
既運行得快,又寫得快
spark5大模塊:
回顧:MR的執(zhí)行流程
hadoop為什么慢???額外的復(fù)制,序列化,磁盤IO開銷
spark為什么快???因為內(nèi)存計算,當(dāng)然還有DAG(有向無環(huán)圖)
支持3種語言的API :Scala(很好)Python(不錯)Java(…)
有4種模式可以運行
Local 多用于測試
Standalone 節(jié)點運行
Mesos
YARN 最具前景
本地部署spark:
添加依賴
WordCount:
數(shù)據(jù)展示:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
// 創(chuàng)建spark配置文件對象
val conf = new SparkConf()
// 設(shè)置運行模式
// local模式運行,需要設(shè)置setMaster
// 若要是集群運行,注釋這句話即可
conf.setMaster("local")
// 設(shè)置spark作業(yè)的名字
conf.setAppName("wordcount")
// 創(chuàng)建spark上下文環(huán)境對象
val sc = new SparkContext(conf)
// 1. 讀取文件 每次讀一行
//RDD是spark core中的核心數(shù)據(jù)結(jié)構(gòu),將來運行的時候,數(shù)據(jù)會在RDD之間流動,默認(rèn)基于內(nèi)存計算
val lineRDD: RDD[String] = sc.textFile("spark/data/test.text")
// lineRDD.foreach(println)
// 2.處理數(shù)據(jù) 根據(jù)分隔符切分 扁平化處理
val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))
// wordRDD.foreach(println)
// 3.將每一個單詞組成(word,1)
val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
// 分組
val kv: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1)
val result: RDD[(String, Int)] = kv.map(s => (s._1, s._2.size))
// 打印
result.foreach(println)
/*
* 鏈?zhǔn)秸{(diào)用
* */
sc.textFile("spark/data/test.text")
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(_._1)
.map(s => (s._1, s._2.size))
.foreach(println)
}
}
wordcount 圖解:
Spark Core
spark RDD
RDD: 彈性分布式數(shù)據(jù)集
彈性:數(shù)據(jù)量可大可小 RDD類似于容器,但是本身存儲的不是數(shù)據(jù),是計算邏輯 當(dāng)遇到行動算子的時候,整個spark作業(yè)才會被觸發(fā)執(zhí)行,是從第一個RDD開始執(zhí)行,數(shù)據(jù)才開始產(chǎn)生流動 數(shù)據(jù)在RDD之間只是流動關(guān)系,不會存儲 流動的數(shù)據(jù)量可以很大,也可以很小,所以稱為彈性 分布式: spark本質(zhì)上它是需要從HDFS中讀取數(shù)據(jù)的,HDFS是分布式,數(shù)據(jù)block塊將來可能會在不同的datanode上 RDD中流動的數(shù)據(jù),可能會來自不同的datanode中的block塊數(shù)據(jù) 數(shù)據(jù)集: 計算流動過程中,可以短暫地將RDD看成一個容器,容器中有數(shù)據(jù),默認(rèn)情況下在內(nèi)存中不會進行存儲 后面會有辦法將一個RDD的數(shù)據(jù)存儲到磁盤中
RDD的五大特性(重要?。。。?/p>
1、RDD是由一系列分區(qū)構(gòu)成
注意:
? 1)讀文件時的minPartitions參數(shù)只能決定最小分區(qū)數(shù),實際讀取文件后的RDD分區(qū)數(shù),由數(shù)據(jù)內(nèi)容本身以及集群的分布來共同決定的
? 2)若設(shè)置minPartitions的大小比block塊數(shù)量還少的話,實際上以block塊數(shù)量來決定分區(qū)數(shù)
? 3)產(chǎn)生shuffle的算子調(diào)用時,可以傳入numPartitions,實際真正改變RDD的分區(qū)數(shù),設(shè)置多少,最終RDD就有多少分區(qū)
2、算子是作用在每一個分區(qū)上的
3、RDD與RDD之間存在一些依賴關(guān)系
1)窄依賴 前一個RDD中的某一個分區(qū)數(shù)據(jù)只會到后一個RDD中的某一個分區(qū) 一對一的關(guān)系2)寬依賴 前一個RDD中的某一個分區(qū)數(shù)據(jù)會進入到后一個RDD中的不同分區(qū)中 一對多的關(guān)系 也可以通過查看是否產(chǎn)生shuffle來判斷3)整個spark作業(yè)會被寬依賴的個數(shù)劃分若干個stage, Num(stage) = Num(寬依賴) + 14)當(dāng)遇到產(chǎn)生shuffle的算子的時候,涉及到從前一個RDD寫數(shù)據(jù)到磁盤中,從磁盤中讀取數(shù)據(jù)到后一個RDD的現(xiàn)象,注意:第一次觸發(fā)執(zhí)行的時候,磁盤是沒有數(shù)據(jù)的,所以會從第一個RDD產(chǎn)生開始執(zhí)行當(dāng)重復(fù)觸發(fā)相同的執(zhí)行的時候,對于同一個DAG有向無環(huán)圖而言,會直接從shuffle之后的RDD開始執(zhí)行,可以直接從磁盤讀取數(shù)據(jù)。5)一個階段中,RDD有幾個分區(qū),就會有幾個并行task任務(wù)
4、kv算子只能作用在kv的RDD上
5、spark會提供最優(yōu)的任務(wù)計算方式,只移動計算,不移動數(shù)據(jù)。
spark作業(yè)執(zhí)行的特點:
1、只有遇到行動算子的時候,整個spark作業(yè)才會被觸發(fā)執(zhí)行2、遇到幾次,執(zhí)行幾次
def main(args: Array[String]): Unit = {
// 創(chuàng)建spark配置文件對象
val conf = new SparkConf()
// 設(shè)置運行模式
// local模式運行,需要設(shè)置setMaster
// 若要是集群運行,注釋這句話即可
conf.setMaster("local")
// 設(shè)置spark作業(yè)的名字
conf.setAppName("WordCount")
// 創(chuàng)建spark上下文環(huán)境對象
val sc = new SparkContext(conf)
// 1. 讀取文件 每次讀一行
//RDD是spark core中的核心數(shù)據(jù)結(jié)構(gòu),將來運行的時候,數(shù)據(jù)會在RDD之間流動,默認(rèn)基于內(nèi)存計算
// val lineRDD: RDD[String] = sc.textFile("spark/data/test.text")
// println(lineRDD.getNumPartitions) // 查看分區(qū)數(shù) 默認(rèn)一個分區(qū)
val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*",minPartitions = 3) // 設(shè)置最小分區(qū)數(shù) 為3 不是實際分區(qū)數(shù)
println(s"lineRDD的分區(qū)數(shù):${linesRDD.getNumPartitions}") // 2個分區(qū) 說明 有幾個block塊 就有幾個分區(qū) 下面的幾個RDD都是兩個分區(qū)
// 2.處理數(shù)據(jù) 根據(jù)分隔符切分 扁平化處理
val wordRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
// wordRDD.foreach(println)
// 3.將每一個單詞組成(word,1)
val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
// 分組
// 需要取消讀文件時設(shè)置的最小分區(qū)數(shù),從這之后的分區(qū)數(shù)為5,說明產(chǎn)生shuffle的算子調(diào)用時 numPartitions可以改變RDD的分區(qū)數(shù)
val kv: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
val result: RDD[(String, Int)] = kv.map(s => (s._1, s._2.size))
val resRDD2: RDD[(String, Int)] = result.map((kv: (String, Int)) => {
println("==================防偽碼=====================")
(kv._1, kv._2)
})
//打印
resRDD2.foreach(println) // 調(diào)用了算子 所以執(zhí)行了 直接使用println是不行的
// println("=" * 100)
// resRDD2.foreach(println) 調(diào)用一次 執(zhí)行一次
// 查看spark jobs 界面 查看job數(shù) stage數(shù) task任務(wù)數(shù)(取決于分區(qū)數(shù)) DAG 有向無環(huán)圖
// 打印
// result.foreach(println)
//指定的是文件夾的路徑
//spark如果是local本地運行的話,會將本地文件系統(tǒng)看作一個hdfs文件系統(tǒng) 出現(xiàn)crc校驗文件等
// result.saveAsTextFile("spark/data/outdata1")
}
RDD 算子
transformation 算子 轉(zhuǎn)換算子(RDD->RDD)
Action算子 行動算子
寬依賴和窄依賴的例子:
窄依賴中的pipeline操作:使得task’的執(zhí)行任務(wù)非???/p>
轉(zhuǎn)換算子
Map
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo1Map {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Map算子演示")
// 上下文對象
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//map操作算子:將rdd中的數(shù)據(jù)依次取出,傳遞給后面函數(shù)邏輯,將計算后的數(shù)據(jù)返回到新的rdd中
//將rdd中的數(shù)據(jù)依次取出,處理完的數(shù)據(jù)返回下一個rdd直接繼續(xù)執(zhí)行后續(xù)的邏輯
val rdd: RDD[(String, String, String, String, String)] = lineRDD.map(s => {
println("============桀桀桀=============")
val arr1: Array[String] = s.split(",")
(arr1(0), arr1(1), arr1(2), arr1(3), arr1(4))
})
// 此時運行 沒有結(jié)果 因為沒有行動算子
// foreach就是一個行動算子
rdd.foreach(println)
// 結(jié)果: 不是先打印1000次 ============桀桀桀============= 交替進行
//...
//============桀桀桀=============
//(1500100934,隆高旻,21,男,理科五班)
//============桀桀桀=============
//(1500100935,蓬昆琦,21,男,文科六班)
//============桀桀桀=============
//(1500100936,習(xí)振銳,23,男,理科二班)
// ....
}
}
Filter
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo2Filter {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Filter算子演示")
// 上下文對象
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
// 過濾除所有的男生
//filter轉(zhuǎn)換算子:將rdd中的數(shù)據(jù)依次取出,傳遞給后面的函數(shù),跟map一樣,也是依次傳遞一條
val genderRDD: RDD[String] = lineRDD.filter(s => {
// println("============桀桀桀=============") 打印的可能是女生
// s.split(",")(3).equals("男")
// 將確定的字符串值放前面 假如為空?
// "男".equals(s.split(",")(3))
var b: Boolean = false
if ("女".equals(s.split(",")(3))) {
println("============這是女生==================")
} else {
println("============這是男生==================")
b = "男".equals(s.split(",")(3))
}
b
})
genderRDD.foreach(println)
// 結(jié)果
// ...
// 1500100968,譚晗日,24,男,文科五班
// ============桀桀桀=============
// 1500100969,毛昆鵬,24,男,文科三班
// ============桀桀桀=============
// ============桀桀桀=============
// ============桀桀桀=============
// 1500100972,王昂杰,23,男,理科二班
// ============桀桀桀=============
// ============桀桀桀=============
// 1500100974,容鴻暉,21,男,文科五班
// ============桀桀桀=============
// 1500100975,蓬曜瑞,22,男,理科三班
// ============桀桀桀=============
// ============桀桀桀=============
// ============桀桀桀=============
// 1500100978,郜昆卉,21,男,文科五班
// ...
// 結(jié)果2: 驗證打印============桀桀桀=============是因為 過濾了女生
// 1500100898,祁高旻,22,男,理科五班
// ============這是男生==================
// 1500100899,計浩言,22,男,文科四班
// ============這是女生==================
// ============這是男生==================
// 1500100901,崔海昌,21,男,理科六班
// ============這是男生==================
// 1500100902,豐昊明,23,男,文科六班
// ============這是女生==================
// ============這是女生==================
// ============這是女生==================
// ============這是女生==================
// ============這是女生==================
// ============這是男生==================
// 1500100908,那光濟,22,男,文科二班
// ============這是男生==================
// 1500100909,符景天,23,男,文科二班
}
}
FlatMap
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo3FlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("FlatMap算子演示")
// 上下文對象
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")
/**
* flatMap: 將rdd中的數(shù)據(jù)每一條數(shù)據(jù)傳遞給后面的函數(shù),最終將返回的數(shù)組或者是序列進行扁平化,返回給新的集合
*/
val rdd1: RDD[String] = lineRDD.flatMap(s=>{
println("============一條數(shù)據(jù)=============")
s.split("\\|")
})
rdd1.foreach(println)
// 結(jié)果
// ============一條數(shù)據(jù)=============
//hello
//world
//============一條數(shù)據(jù)=============
//java
//hadoop
//linux
//============一條數(shù)據(jù)=============
//java
//scala
//hadoop
// ......
}
}
Sample
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo4Sample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Sample算子演示")
// 上下文對象
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
/*
def sample(
withReplacement: Boolean, 去重
fraction: Double, 抽樣的比例
seed: Long = Utils.random.nextLong): RDD[T] = {
*/
/**
* sample抽樣,1000條數(shù)據(jù),抽0.1比例,結(jié)果的數(shù)量在100左右 不去重
* 這個函數(shù)主要在機器學(xué)習(xí)的時候會用到
*/
val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)
rdd1.foreach(println)
// 結(jié)果: 在100條數(shù)據(jù)左右 每次運行不一樣
}
}
GroupBy
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo5GroupBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupBy")
val sc: SparkContext = new SparkContext(conf)
// 求每個班的平均年齡
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val arr1: RDD[Array[String]] = lineRDD.map(s => {
s.split(",")
})
//像這種RDD中的元素是(key,value)類型的,我們將這種RDD稱之為鍵值對RDD(kv格式RDD)
val clazzWithAgeRDD: RDD[(String, Int)] = arr1.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
// groupBy算子 的使用 分組條件是我們自己指定的 spark中g(shù)roupBy之后的,所有值會被封裝到一個Iterable迭代器中存儲
val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
val kvRDD: RDD[(String, Double)] = groupRDD.map(kv => {
val clazz: String = kv._1
val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size
(clazz, avgAge)
})
kvRDD.foreach(println)
// 結(jié)果:
//(理科二班,22.556962025316455)
//(文科三班,22.680851063829788)
//(理科四班,22.63736263736264)
//(理科一班,22.333333333333332)
//(文科五班,22.30952380952381)
//(文科一班,22.416666666666668)
//(文科四班,22.506172839506174)
//(理科六班,22.48913043478261)
//(理科三班,22.676470588235293)
//(文科六班,22.60576923076923)
//(理科五班,22.642857142857142)
//(文科二班,22.379310344827587)
}
}
GroupByKey
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo6GroupByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupByKey")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//求每個班級的平均年齡
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
/**
* GroupByKey屬于kv格式的算子,只能作用在kv格式的RDD上
* 也就說,只有kv格式的RDD才能調(diào)用kv格式的算子
*/
val gbkRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
val resRDD: RDD[(String, Double)] = gbkRDD.map(kv => (kv._1, kv._2.sum.toDouble / kv._2.size))
resRDD.foreach(println)
// 結(jié)果:
//(理科二班,22.556962025316455)
//(文科三班,22.680851063829788)
//(理科四班,22.63736263736264)
//(理科一班,22.333333333333332)
//(文科五班,22.30952380952381)
//(文科一班,22.416666666666668)
//(文科四班,22.506172839506174)
//(理科六班,22.48913043478261)
//(理科三班,22.676470588235293)
//(文科六班,22.60576923076923)
//(理科五班,22.642857142857142)
//(文科二班,22.379310344827587)
}
}
spark core中 groupBy算子與groupByKey算子的區(qū)別?
1、代碼格式上:
groupBy的分組條件可以自己指定,并且絕大部分的RDD都可以調(diào)用該算子,返回的是鍵和元素本身組成的迭代器構(gòu)成的kv格式RDD groupByKey算子,只能由kv格式的RDD進行調(diào)用,分組的條件會自動根據(jù)鍵進行分組,不需要在自己指定,返回的是鍵和值組成的迭代器構(gòu)成的kv格式RDD
2、執(zhí)行shuffle數(shù)據(jù)量來看
groupBy產(chǎn)生的shuffle數(shù)據(jù)量在一定程度上要大于groupByKey產(chǎn)生的shuffle數(shù)據(jù)量 所以groupByKey算子的執(zhí)行效率要比groupBy算子的執(zhí)行效率要高
ReduceByKey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo7ReduceByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("ReduceByKey")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//分別使用groupByKey和reduceByKey計算每個學(xué)生的總分
// 封裝成只有kv
val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(id: String, _, score: String) =>
(id, score.toInt)
}
// 先使用groupByKey
val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
val resRDD1: RDD[(String, Int)] = kvRDD1.map(kv => (kv._1, kv._2.sum))
// resRDD1.foreach(println)
//結(jié)果
//(1500100724,440)
//(1500100369,376)
//(1500100378,402)
//(1500100306,505)
//(1500100578,397)
//(1500100968,320)
//(1500100690,435) ...
// 使用ReduceByKey
val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey(_ + _)
resRDD2.foreach(println)
// 結(jié)果
//(1500100883,362)
//(1500100990,422)
//(1500100346,391)
//(1500100178,388)
//(1500100894,371)
//(1500100519,334)
//(1500100905,264)
//(1500100624,317)...
}
}
groupByKey與reduceBykey的區(qū)別?
相同點:
它們都是kv格式的算子,只有kv格式的RDD才能調(diào)用
不同點:
1)groupByKey只是單純地根據(jù)鍵進行分組,分組后的邏輯可以在后續(xù)的處理中調(diào)用其他的算子實現(xiàn)2)reduceByKey 相當(dāng)于MR中的預(yù)聚合,所以shuffle產(chǎn)生的數(shù)據(jù)量要比groupByKey中shuffle產(chǎn)生的數(shù)據(jù)量少,效率高,速度要快一些3)groupByKey的靈活度要比reduceByKey靈活度要高,reduceBykey無法做一些復(fù)雜的操作,比如方差。但是groupByKey可以在分組之后的RDD進行方差操作
圖解:
Union
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo8Union {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//parallelize:將scala的集合變成spark中的RDD
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "fy"),
("1002", "fy2"),
("1003", "fy3"),
("1004", "fy4"),
("1005", "fy5")
))
println(s"rdd1的分區(qū)數(shù):${rdd1.getNumPartitions}")
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1006", "fz6"),
("1007", "fz7"),
("1008", "fz8"),
("1003", "fy3"),
("1009", "fz9")
))
println(s"rdd2的分區(qū)數(shù):${rdd2.getNumPartitions}")
val rdd3: RDD[(String, Int)] = sc.parallelize(List(
("1006", 1),
("1007", 2),
("1008", 3),
("1003", 4),
("1009", 5)
))
//兩個RDD要想進行union合并,必須保證元素的格式和數(shù)據(jù)類型是一致的
//分區(qū)數(shù)也會進行合并,最終的分區(qū)數(shù)由兩個RDD總共的分區(qū)數(shù)決定
// rdd1.union(rdd3) 不行
val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
resRDD1.foreach(println) // 結(jié)果看不出端倪 打印分區(qū)看看
println(s"resRDD1的分區(qū)數(shù):${resRDD1.getNumPartitions}")
// 結(jié)果
//rdd1的分區(qū)數(shù):1
//rdd2的分區(qū)數(shù):1
//resRDD1的分區(qū)數(shù):2
}
}
Join
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* join算子也要作用在kv格式的RDD上
*/
object Demo9Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Join")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "1號"),
("1002", "2號"),
("1003", "3號"),
("1004", "4號"),
("1005", "5號")
))
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1001", "看美女"),
("1002", "看綜藝"),
("1003", "看八卦"),
("1004", "打游戲"),
("1009", "學(xué)習(xí)")
))
/**
* join 內(nèi)連接
* right join 右連接
* left join 左連接
* full join 全連接
*/
// join 內(nèi)連接 兩個rdd共同擁有的鍵才會進行關(guān)聯(lián)
// val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
// val resRDD2: RDD[(String, String, String)] = resRDD1.map {
// case (id: String, (name: String, like: String)) =>
// (id, name, like)
// }
// resRDD2.foreach(println)
//right join 右連接 保證右邊rdd鍵的完整性
// val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
// val resRDD3: RDD[(String, String, String)] = resRDD2.map {
// case (id: String, (Some(name), like: String)) =>
// (id, name, like)
// case (id: String, (None, like: String)) =>
// (id, "查無此人", like)
// }
// resRDD3.foreach(println)
//TODO:自己完成左關(guān)聯(lián)
// left join 左連接 保證左邊rdd鍵的完整性
val resRDD2: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
val resRDD3: RDD[(String, String, String)] = resRDD2.map {
case (id: String, (name: String, Some(like))) =>
(id, name, like)
case (id: String, (name: String, None)) =>
(id, name, "此人無愛好")
}
resRDD3.foreach(println)
// 結(jié)果
// (1005,5號,此人無愛好)
// (1001,1號,看美女)
// (1002,2號,看綜藝)
// (1004,4號,打游戲)
// (1003,3號,看八卦)
// //全關(guān)聯(lián)
// val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
// val resRDD3: RDD[(String, String, String)] = resRDD2.map {
// case (id: String, (Some(name), Some(like))) =>
// (id, name, like)
// case (id: String, (Some(name), None)) =>
// (id, name, "此人無愛好")
// case (id: String, (None, Some(like))) =>
// (id, "查無此人", like)
// }
// resRDD3.foreach(println)
}
}
MapValues
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo10MapValues {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("MapValues算子演示")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
// 給每個人的年齡加上100
val kvRDD1: RDD[(String, Int)] = linesRDD.map(_.split(",")).map {
case Array(_, name:String, age:String, _,_) =>
(name, age.toInt)
}
/**
* mapValues函數(shù)也是作用在kv格式的算子上
* 將每個元素的值傳遞給后面的函數(shù),進行處理得到新的值,鍵不變,這個處理后的組合重新返回到新的RDD中
*/
kvRDD1.mapValues(_ + 100).foreach(println)
//(于從寒,123)
//(凌智陽,121)
//(卞樂萱,121)
//(于晗昱,122)
//(濮恨蕊,123)
//(戚昌盛,122)
//(滿慕易,121)
}
}
mapPartitions
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo11PartitionBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("mapPartitions算子演示")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
/**
* mapPartitions:一次處理一個分區(qū)中的數(shù)據(jù)
* 它與map的區(qū)別在于,map是每次處理一條數(shù)據(jù)就返回一條數(shù)據(jù)到下一個rdd
* 而mapPartitions一次處理一個分區(qū)的數(shù)據(jù),處理完再返回
* 最后的處理效果和map的處理效果是一樣的
*
* mapPartition可以優(yōu)化與數(shù)據(jù)庫連接的次數(shù)
*/
// s是Iterator[String]類型
val rdd1: RDD[String] = linesRDD.mapPartitions(s => {
println("=========================") // 打印了兩次 對應(yīng)兩個分區(qū)
s.map(e => {
e
})
})
rdd1.foreach(println)
}
}
SortBy
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo12SortBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SortBy算子演示")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(List(34, 123, 6, 1, 231, 1, 34, 56, 2))
val rdd2: RDD[Int] = rdd1.sortBy((e: Int) => e)
rdd2.foreach(println)
//1
//1
//2
//6
//34
//34
//56
//123
//231
}
}
行動算子
Foreach
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo13Foreach {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("foreach算子演示")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val rdd1: RDD[Array[String]] = linesRDD.map((e: String) => {
e.split(",")
})
val rdd2: RDD[(String, String, String, String, String)] = rdd1.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
(id, name, age, gender, clazz)
}
/**
* 行動算子,就可以觸發(fā)一次作業(yè)執(zhí)行,有幾次行動算子調(diào)用,就會觸發(fā)幾次
*
* rdd是懶加載的性質(zhì)
*/
// rdd2.foreach(println)
// println("====================================")
// rdd2.foreach(println)
println("哈哈哈") // 一定會打印,不屬于spark作業(yè)中的語句
val rdd3: RDD[(String, String, String, String, String)] = rdd2.map((t5: (String, String, String, String, String)) => {
println("===============================") // 沒有行動算子時 不會打印
t5
})
println("嘿嘿嘿")// 不是Spark作業(yè)里的
rdd3.foreach(println) // 數(shù)據(jù)和"===============================" 交替打印
}
}
Collect
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo14collect {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Collect算子演示")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val rdd1: RDD[Array[String]] = linesRDD.map((e: String) => {
e.split(",")
})
val rdd2: RDD[Student] = rdd1.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
Student(id.toInt, name, age.toInt, gender, clazz)
}
//collect將rdd轉(zhuǎn)成合適的scala中的數(shù)據(jù)結(jié)構(gòu)
val stuArr: Array[Student] = rdd2.collect()
//foreach是scala中的foreach,不會產(chǎn)生作業(yè)執(zhí)行的
stuArr.foreach(println)
}
}
case class Student(id:Int,name:String,age:Int,gender:String,clazz:String)
算子應(yīng)用
// 求總分前十的學(xué)生的各科成績:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo15StudentTest1 {
def main(args: Array[String]): Unit = {
//求年級總分前10的學(xué)生各科分?jǐn)?shù)的詳細(xì)信息
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("MapValues算子演示")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
val idWithScoreRDD: RDD[(String, String, Int)] = linesRDD.map((line: String) => {
line.split(",") match {
case Array(id: String, subject_id: String, score: String) =>
(id, subject_id, score.toInt)
}
})
val array1: Array[String] = idWithScoreRDD
.map((t3: (String, String, Int)) => (t3._1, t3._3))
.reduceByKey(_ + _)
.sortBy((kv: (String, Int)) => -kv._2)
.take(10) // take 也是行動算子
.map(_._1)
idWithScoreRDD.filter((t3: (String, String, Int)) => {
val bool: Boolean = array1.contains(t3._1)
if(bool){
println("存在")
}
bool
}).foreach((t3: (String, String, Int)) => {
println("==========================")
println(t3)
})
}
}
緩存
緩存級別
cache
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
object Demo16cache {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("緩存演示")
val sc: SparkContext = new SparkContext(conf)
//===================================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val studentsRDD: RDD[Student2] = linesRDD.map(_.split(","))
.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
Student2(id, name, age.toInt, gender, clazz)
}
/**
* 緩存:
運行結(jié)束后 就消失了
* 緩存的目的是為了spark core作業(yè)執(zhí)行的時候,縮短rdd的執(zhí)行鏈,能夠更快的得到結(jié)果
* 緩存的目的是避免每一次job作業(yè)執(zhí)行的時候,都需要從第一個rdd算起
* 對重復(fù)使用RDD進行緩存
* cache 設(shè)置不了緩存級別
* persist 可以設(shè)置緩存級別
* 緩存的實現(xiàn)方式:
* 1、需要緩存的rdd調(diào)用cache函數(shù)
* 2、persist(StorageLevel.MEMORY_ONLY) 修改緩存級別
*
*/
studentsRDD.cache() //默認(rèn)將rdd緩存到內(nèi)存中,緩存級別為memory_only
// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK) // 可以修改 這里改為加上磁盤(有時候內(nèi)存不夠的話)
// 需求1和2都重復(fù)使用了studentsRDD 可以放在一個地方 隨用隨拿 不用從第一個RDD開始運行
//需求1:求每個班級的人數(shù)
studentsRDD.groupBy(_.clazz).map(kv=>{
(kv._1,kv._2.size)
}).foreach(println)
//(理科二班,79)
//(文科三班,94)
//(理科四班,91)
//(理科一班,78)
//(文科五班,84)
//(文科一班,72)
//(文科四班,81)
//(理科六班,92)
//(理科三班,68)
//(文科六班,104)
//(理科五班,70)
//(文科二班,87)
//需求2:求每個年齡的人數(shù)
studentsRDD.groupBy(_.age)
.map(kv=>(kv._1,kv._2.size))
.foreach(println)
//(21,234)
//(22,271)
//(24,260)
//(23,235)
while(true){
}
}
}
case class Student2(id:String,name:String,age:Int,gender:String,clazz:String)
進入spark jobs 查看 DAG 在map階段就從cache里拿RDD了。
checkpoint
永久的保存數(shù)據(jù)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo17Checkpoint {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("緩存演示")
val sc: SparkContext = new SparkContext(conf)
//設(shè)置檢查點的存儲路徑
sc.setCheckpointDir("spark/data/checkpoint1")
//===================================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val studentsRDD: RDD[Student2] = linesRDD.map(_.split(","))
.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
Student2(id, name, age.toInt, gender, clazz)
}
/**
* 永久將執(zhí)行過程中RDD中流動的數(shù)據(jù)存儲到磁盤(hdfs)中
* checkpoint
*
* 需要設(shè)置checkpoint的路徑,統(tǒng)一設(shè)置的
*
* checkpoint也相當(dāng)于一個行動算子,觸發(fā)作業(yè)執(zhí)行
* 第二次DAG有向無環(huán)圖執(zhí)行的時候,直接從最后一個有檢查點的rdd開始向下執(zhí)行
*/
studentsRDD.checkpoint()// 必須得設(shè)置路徑 在SparkContext 設(shè)置
//需求1:求每個班級的人數(shù)
val rdd1: RDD[(String, Iterable[Student2])] = studentsRDD.groupBy(_.clazz)
val resRDD1: RDD[(String, Int)] = rdd1.map((kv: (String, Iterable[Student2])) => (kv._1, kv._2.size))
resRDD1.foreach(println)
//需求2:求每個年齡的人數(shù)
val rdd2: RDD[(Int, Iterable[Student2])] = studentsRDD.groupBy(_.age)
val resRDD2: RDD[(Int, Int)] = rdd2.map((kv: (Int, Iterable[Student2])) => (kv._1, kv._2.size))
resRDD2.foreach(println)
while (true) {
}
}
}
checkpoint和cache的區(qū)別?
cache是將一個復(fù)雜的RDD做緩存,將來執(zhí)行的時候,只是這個rdd會從緩存中取 數(shù)據(jù)量小checkpoint是永久將rdd數(shù)據(jù)持久化,將來執(zhí)行的時候,直接從檢查點的rdd往后執(zhí)行 數(shù)據(jù)量大 邏輯簡單
本地搭建Spark
下載spark-3.1.3-bin-hadoop3.2.tgz
(https://mirrors.huaweicloud.com/apache/spark/spark-3.1.3/)
上傳解壓:
tar -zxvf spark-3.1.3-bin-hadoop3.2.tgz
改名
mv spark-3.1.3-bin-hadoop3.2/ spark-3.1.3
更該所屬用戶所屬組
chown -R root:root spark-3.1.3/
添加環(huán)境變量
SPARK_HOME=/usr/local/soft/spark-3.1.3
export PATH=$SPARK_HOME/bin:$PATH
修改配置文件 conf
cp spark-env.sh.template spark-env.sh
增加配置
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
#master相當(dāng)于RM worker相當(dāng)于NM
? 增加從節(jié)點配置
cp workers.template workers
# 增加
node1
node2
復(fù)制到其它節(jié)點
scp -r spark-3.1.3 node1:`pwd`
scp -r spark-3.1.3 node2:`pwd`
撰寫運行spark腳本
vim startspark.sh
#! /bin/bash
/usr/local/soft/spark-3.1.3/sbin/start-all.sh
給腳本賦予執(zhí)行權(quán)限
chmod +x startspark.sh
訪問spark ui
http://master:8080/
standalone
client模式
日志在本地輸出,不需要開啟hadoop一般用于上線前測試(bin/下執(zhí)行)
使用spark樣例 運行計算圓周率
/usr/local/soft/spark-3.1.3/examples/jars/spark-examples_2.12-3.1.3.jar
#提交spark任務(wù)
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-examples_2.12-3.1.3.jar 10
# 10 是并行度 分區(qū)數(shù) 這里更大更精確
日志在本地顯示
cluster模式
上線使用,不會再本地打印日志 集群化運行
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-examples_2.12-3.1.3.jar 100
spark-shell
spark 提供的一個交互式的命令行,可以直接寫代碼
編寫代碼打包上傳在standalone下運行
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo18Standalone {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("standalone集群運行")
val sc: SparkContext = new SparkContext(conf)
//統(tǒng)計單詞個數(shù)
val rdd1: RDD[String] = sc.parallelize(List("hive|java|hello|world", "hive|java|hadoop|world", "hive|spark|hello|hadoop"))
rdd1.flatMap(_.split("\\|"))
.map((_,1))
.reduceByKey(_+_)
.foreach(println)
/**
* standalone
* - client模式提交命令:
* spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 10
*
* - cluster模式提交命令:
* spark-submit --class com.shujia.core.Demo18Standalone --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 10
*
*
*/
}
}
client模式運行結(jié)果
cluster模式運行:
將jar包發(fā)給node1 和node2中
scp spark-1.0.jar node1:/usr/local/soft/spark-3.1.3/jars/
scp spark-1.0.jar node2:/usr/local/soft/spark-3.1.3/jars/
mv spark-1.0.jar /usr/local/soft/spark-3.1.3/jars/
yarn
停止spark集群 在spark sbin目錄下執(zhí)行 ./stop-all.sh
spark整合yarn只需要在一個節(jié)點整合, 可以刪除node1 和node2中所有的spark 文件
增加hadoop 配置文件地址
vim spark-env.sh
#增加
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-3.1.3/etc/hadoop
往yarn提交任務(wù)需要增加兩個配置 yarn-site.xml(/usr/local/soft/hadoop-2.7.6/etc/hadoop/yarn-site.xml)
hadoop/common/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/common/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs:/usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/mapreduce/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/yarn:/usr/local/soft/hadoop-3.1.3/share/hadoop/yarn/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/yarn/*
同步到其他節(jié)點,重啟yarn
scp -r yarn-site.xml node1:`pwd`
scp -r yarn-site.xml node2:`pwd`
spark on yarn client模式 日志在本地輸出,一班用于上線前測試
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client spark-examples_2.12-3.1.3.jar 100
spark on yarn cluster模式 上線使用,不會再本地打印日志 減少io
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster spark-examples_2.12-3.1.3.jar 100
yarn logs -applicationId application_1720850173901_0001 # 查看日志
案例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//讀取hdfs上的學(xué)生數(shù)據(jù),統(tǒng)計每個班級的人數(shù),寫回到hdfs上
object Demo19YarnCluster {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("spark yarn cluster")
val sc: SparkContext = new SparkContext(conf)
//如果是打包到集群的話,這里的路徑就是hdfs路徑
//如果是local的話,這個路徑就是我們windows的路徑
val linesRDD: RDD[String] = sc.textFile("/bigdata30/students.csv")
//coalesce函數(shù),repartition函數(shù)可以修改分區(qū)
// val linesRDD2: RDD[String] = linesRDD.coalesce(1)
// linesRDD.repartition(1)
println("=============================================================================================================")
println(s"========================== linesRDD的分區(qū)數(shù)是:${linesRDD.getNumPartitions} ===================================")
println("=============================================================================================================")
val clazzKVRDD: RDD[(String, Int)] = linesRDD.map((line: String) => {
line.split(",") match {
case Array(_, _, _, _, clazz: String) =>
(clazz, 1) // 班級和1構(gòu)成的鍵值對
}
})
val resultRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey(_ + _)
val resultRDD2: RDD[String] = resultRDD.map((t2: (String, Int)) => s"${t2._1},${t2._2}")
println("=============================================================================================================")
println(s"========================== resultRDD2的分區(qū)數(shù)是:${resultRDD2.getNumPartitions} ===================================")
println("=============================================================================================================")
//行動算子,觸發(fā)作業(yè)執(zhí)行
resultRDD2.saveAsTextFile("/bigdata30/sparkout1")
}
}
打包上傳 在jar包目錄下運行
spark-submit --class com.shujia.core.Demo19YarnCluster --master yarn --deploy-mode cluster spark-1.0.jar
在HDFS上可以看到結(jié)果
術(shù)語解釋
Application:基于Spark的應(yīng)用程序,包含了driver程序和 集群上的executor
DriverProgram:運行main函數(shù)并且新建SparkContext的程序
ClusterManager:在集群上獲取資源的外部服務(wù)(例如 standalone,Mesos,Yarn )
WorkerNode:集群中任何可以運行應(yīng)用用代碼的節(jié)點
Executor:是在一個workernode上為某應(yīng)用用啟動的一個進程,該進程負(fù)責(zé)運行任務(wù),并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上。每個應(yīng)用用都有各自自獨立的executors
Task:被送到某個executor上的執(zhí)行單元 線程
Job:包含很多任務(wù)的并行計算的task,可以看做和Spark的action對應(yīng),每個action都會觸發(fā)一個job任務(wù)
Stage:一個Job會被拆分很多組任務(wù),每組任務(wù)被稱為Stage(就像MapReduce分map任務(wù)和reduce任務(wù)一樣)
任務(wù)調(diào)度
包含 重試機制 推測執(zhí)行機制
DAG Scheduler:
基于Stage構(gòu)建DAG,決定每個任務(wù)的最佳位置
記錄哪個RDD或者Stage輸出被物化
將taskset傳給底層調(diào)度器TaskScheduler
重新提交shuffle輸出丟失的stage
Task Scheduler:
提交taskset(一組并行task)到集群運行并匯報結(jié)果
出現(xiàn)shuffle輸出lost要報告fetchfailed錯誤
碰到straggle任務(wù)需要放到別的節(jié)點上重試
為每一一個TaskSet維護一一個TaskSetManager(追蹤本地性及錯誤信息)
累加器
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Demo21Accumulator {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("累加器案例")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
// var num = 0
/**
* 累加器
* 必要有行動算子觸發(fā)作業(yè)執(zhí)行
* 1.因為累加器的執(zhí)行是在RDD中執(zhí)行的,而RDD是在Executor中執(zhí)行的,而要想在Executor中執(zhí)行就得有一個action算子觸發(fā)任務(wù)調(diào)度
*
*/
// linesRDD.foreach((e: String) => {
// num += 1
// println("-----------------------")
// println(num) // 可以到1000
// })
// println(s"num的值為:$num") // 0
//使用累加器
// 創(chuàng)建累加器變量
val c1: LongAccumulator = sc.longAccumulator("c1")
linesRDD.foreach((e:String)=>{
c1.add(1)
})
println(s"累加之后的值為:${c1.value}")
//使用累加器
// 使用map時 必須加上行動算子觸發(fā)作業(yè)執(zhí)行
// val c1: LongAccumulator = sc.longAccumulator("c1")
// linesRDD.map((e: String) => {
// c1.add(1)
// }).collect()
// println(s"累加之后的值為:${c1.value}")
}
}
Spark RDD 注意事項
/**
* 寫spark core程序的注意事項
* 1、RDD中無法嵌套使用RDD
* 2、RDD中無法使用SparkContext
*/
val studentLinesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
val scoreLinesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
val rdd1: RDD[RDD[(String, String)]] = studentLinesRDD.map((line1: String) => {
scoreLinesRDD.map((line2: String) => {
val s1: String = line1.split(",").mkString("|")
val s2: String = line2.split(",").mkString("|")
(s1, s2)
})
})
rdd1.foreach(println) // 報錯
// val studentLinesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
// val scoreLinesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
//
// val rdd1: RDD[RDD[(String, String)]] = studentLinesRDD.map((line1: String) => {
// sc.textFile("spark/data/score.txt").map((line2: String) => {
// val s1: String = line1.split(",").mkString("|")
// val s2: String = line2.split(",").mkString("|")
// (s1, s2)
// })
// })
廣播變量
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
import scala.io.{BufferedSource, Source}
/**
* 廣播大變量
*/
object Demo22Broadcast {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("廣播變量")
val sc: SparkContext = new SparkContext(conf)
val bs: List[String] = Source.fromFile("spark/data/students.txt").getLines().toList
//map1變量在Driver端
//會隨著task任務(wù)一并發(fā)送到executor中執(zhí)行,后期隨著map1的數(shù)據(jù)量變大
//也就意味著,每次發(fā)送任務(wù),附帶的數(shù)據(jù)量就會很大,無形之中,降低的執(zhí)行速度
val map1: mutable.Map[String, String] = new mutable.HashMap[String, String]()
for (elem <- bs) {
val array1: Array[String] = elem.split(",")
val id: String = array1(0)
val name: String = array1(1)
val age: String = array1(2)
val gender: String = array1(3)
val clazz: String = array1(4)
map1.put(id, name + "," + age + "," + gender + "," + clazz)
}
/**
* 廣播變量
* 使用SparkContext中的一個功能,將Driver端的變量廣播到executor執(zhí)行的節(jié)點上的blockManager中
*/
val bc: Broadcast[mutable.Map[String, String]] = sc.broadcast(map1)
val scoreRDD: RDD[String] = sc.textFile("spark/data/score.txt")
//未使用廣播變量
// val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
// val array1: Array[String] = line.split(",")
// val id: String = array1(0)
// // 通過map1的變量,通過鍵獲取值
// val info: String = map1.getOrElse(id, "查無此人") // map1相當(dāng)于一個副本與task任務(wù)一起發(fā)送到Executor中執(zhí)行
// val score: String = array1(2)
// (id, info, score)
// })
//使用廣播變量
//以廣播變量的形式,發(fā)送到Executor中的blockManager中
// 只發(fā)送計算邏輯
val resRDD: RDD[(String, String, String)] = scoreRDD.map((line: String) => {
val array1: Array[String] = line.split(",")
val id: String = array1(0)
//通過廣播過來的大變量,進行關(guān)聯(lián)數(shù)據(jù) .value 方法取出變量
val map2: mutable.Map[String, String] = bc.value
val info: String = map2.getOrElse(id, "查無此人")
val score: String = array1(2)
(id, info, score)
})
resRDD.foreach(println)
}
}
BlockManager:
Spark Sql
Spark SQL是Spark的核心組件之一
與RDD類似,DataFrame也是一個分布式數(shù)據(jù)容器,是spark sql的重要的數(shù)據(jù)結(jié)構(gòu)
初識spark sql: WordCount
數(shù)據(jù)準(zhǔn)備
spark sql處理數(shù)據(jù)的步驟
1、讀取數(shù)據(jù)源2、將讀取到的DF注冊成一個臨時視圖3、使用sparkSession的sql函數(shù),編寫sql語句操作臨時視圖,返回的依舊是一個DataFrame4、將結(jié)果寫出到hdfs上
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
object Demo1WordCount {
def main(args: Array[String]): Unit = {
// spark sql的環(huán)境
val ss: SparkSession = SparkSession.builder()
.master("local")
.appName("sql語法")
.getOrCreate()
// spark sql是spark core的上層api,如果要想使用rdd的編程
// 可以直接通過sparkSession獲取SparkContext對象
// val context: SparkContext = ss.sparkContext
// 讀文件
//spark sql的核心數(shù)據(jù)類型是DataFrame
val df1: DataFrame = ss.read
.format("csv") // 讀取csv格式的文件,但是實際上這種做法可以讀取任意分隔符的文本文件
.option("sep", "\n") //指定讀取數(shù)據(jù)的列與列之間的分隔符
.schema("line String") // 指定表的列字段 包括列名和列數(shù)據(jù)類型
.load("spark/data/wcs/words.txt")
// 查看dataframe的數(shù)據(jù)內(nèi)容
// df1.show()
//查看表結(jié)構(gòu)
// df1.printSchema()
/**
* sql語句是無法直接作用在DataFrame上面的
* 需要提前將要使用sql分析的DataFrame注冊成一張表(臨時視圖)
*/
//老版本的做法將df注冊成一張表
// df1.registerTempTable("wcs")
df1.createOrReplaceTempView("wcs")
/**
* 編寫sql語句作用在表上
* sql語法是完全兼容hive語法
*/
val df2: DataFrame = ss.sql(
"""
|select
|t1.word,
|count(1) as counts
|from(
|select
|explode(split(line,'\\|')) as word
|from wcs) t1 group by t1.word
|""".stripMargin)
df2.show() // show默認(rèn)結(jié)果只展示20條數(shù)據(jù)
//通過觀察源碼發(fā)現(xiàn),DataFrame底層數(shù)據(jù)類型其實就是封裝了DataSet的數(shù)據(jù)類型
val resDS: Dataset[Row] = df2.repartition(1) // 設(shè)置分區(qū)為1 合并分區(qū)
//將計算后的DataFrame保存到本地磁盤文件中
resDS.write
.format("csv") //csv文件默認(rèn)的分隔符是英文逗號
.option("sep","\t") // 設(shè)置分隔符
.mode(SaveMode.Overwrite) // 如果想每次覆蓋之前的執(zhí)行結(jié)果的話,可以在寫文件的同時指定寫入模式,使用模式枚舉類
.save("spark/data/sqlOut1") // 路徑是一個文件夾
}
}
DSL WordCount
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo2DSLWordCount {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkSession對象
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("DSL語法風(fēng)格編寫spark sql")
.getOrCreate()
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("line STRING")
.option("sep", "\n")
.load("spark/data/wcs/words.txt")
/**
* 如果要想使用DSL語法編寫spark sql的話,需要導(dǎo)入兩個隱式轉(zhuǎn)換
*/
//將sql中的函數(shù),封裝成spark程序中的一個個的函數(shù)直接調(diào)用,以傳參的方式調(diào)用
import org.apache.spark.sql.functions._
//主要作用是,將來可以在調(diào)用的函數(shù)中,使用$函數(shù),將列名字符串類型轉(zhuǎn)成一個ColumnName類型
//而ColumnName是繼承自Column類的
import sparkSession.implicits._
//老版本聚合操作
// df1.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()
//新版本聚合操作
val resDF: DataFrame = df1.select(explode(split($"line", "\\|")) as "word")
.groupBy($"word")
.agg(count($"word") as "counts")
resDF.repartition(1)
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("spark/data/sqlOut2")
}
}
DSl語法
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo3DSLApi {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkSession對象
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.shuffle.partitions","1") // 設(shè)置分區(qū)數(shù) 全局設(shè)置
.master("local")
.appName("DSL語法風(fēng)格編寫spark sql")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
// 讀取json文件 轉(zhuǎn)成DF
// 讀取json數(shù)據(jù)的時候,是不需要指定表結(jié)構(gòu),可以自動根據(jù)json的鍵值來構(gòu)建DataFrame
// sparkSession.read
// .format("json")
// .load("spark/data/students.json")
// 新版
val df1: DataFrame = sparkSession.read.json("spark/data/students.json")
// df1.show(100) // 可以指定讀取數(shù)據(jù)行數(shù)
// 一列值過長時,不能完全顯示 傳入第二個參數(shù),使其更詳細(xì)
// df1.show(100,truncate = false)
/*
* DSL 語法的函數(shù)
* */
/*
*select
*類似于純sql語法中的select關(guān)鍵字,傳入要查詢的列
*/
//LIKE:select name,clazz from xxx;
// df1.select("name","clazz").show()
// another type
// df1.select($"name", $"age").show()
//查詢每個學(xué)生的姓名,原本的年齡,年齡+1
/**
* 與select功能差不多的查詢函數(shù)
* 如果要以傳字符串的形式給到select的話,并且還想對列進行表達(dá)式處理的話,可以使用selectExpr函數(shù)
*/
// df1.selectExpr("name","age","age+1 as new_age").show()
//如果想要使用select函數(shù)查詢的時候?qū)α凶霾僮鞯脑挘梢允褂?函數(shù)將列變成一個對象
// df1.select($"name", $"age", $"age" + 1 as "new_age").show()
/*
* where
* 過濾
* */
// df1.where("gender='男'").show()
// df1.where("gender='男' and substring(clazz,0,2)='理科'").show()// 不如sql語句
//建議使用隱式轉(zhuǎn)換中的功能進行處理過濾 === 三個等號 類似于sql中的=
// df1.where($"gender"==="男" and substring($"clazz",0,2)==="理科").show()
// 過濾出女生 理科 不等于男生
// =!= : 類似于sql中的!=或者<> 不等于某個值
// df1.where($"gender"=!="男" and substring($"clazz",0,2)==="理科").show()
/*
* groupBy
* 非分組字段是無法出現(xiàn)在select查詢語句中的
* */
//查詢每個班級的人數(shù)
// df1.groupBy("clazz")
// .agg(count("clazz") as "counts")
// .show()
/*
* orderBy
* */
// df1.groupBy("clazz")
// .agg(count("clazz") as "counts")
// .orderBy($"counts".desc) // 降序
// .show(3)
/*
* join
* */
val df2: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
// df1與df2關(guān)聯(lián)
//關(guān)聯(lián)字段名不一樣的情況
// df2.join(df1,$"id"===$"sid","inner")
// .select("id","name","age","gender","clazz","subject_id","score")
// .show(10)
// 一樣的情況
// df2.join(df1,"id")
// .select("id","name","age","gender","clazz","subject_id","score")
// .show(10)
//如果關(guān)聯(lián)的字段名一樣且想使用其他連接方式的話,可以將字段名字用Seq()傳入,同時可以傳連接方式
// df2.join(df1, Seq("id"),"left")
// .select("id","name","age","gender","clazz","subject_id","score")
// .show(10)
/*
* 開窗
*無論是在純sql中還是在DSL語法中,開窗是不會改變原表條數(shù)
* */
//計算每個班級總分前3的學(xué)生
//純spark sql的方式實現(xiàn)
// df1.createOrReplaceTempView("students")
// df2.createOrReplaceTempView("scores")
// sparkSession.sql(
// """
// |select
// |*
// |from
// |(
// |select t1.id,
// |t2.name,
// |t2.clazz,
// |t1.sumScore,
// |row_number() over(partition by t2.clazz order by t1.sumScore desc) as rn
// |from
// |(
// | select id,
// | sum(score) as sumScore
// | from
// | scores
// | group by id) t1
// |join
// | students t2
// |on(t1.id=t2.id)) tt1 where tt1.rn<=3
// |""".stripMargin).show()
// DSl實現(xiàn)
df2.groupBy("id")
.agg(sum("score") as "sumScore") // 計算總分
.join(df1,"id")
.select($"id",$"name",$"clazz",$"sumScore",row_number() over Window.partitionBy("clazz").orderBy($"sumScore".desc)as "rn") //開窗 排序
.where($"rn"<=3)
//.repartition(1) 單獨設(shè)置分區(qū)數(shù)
.write
.format("csv")
.mode(SaveMode.Overwrite)
.save("spark/data/sqlOut3")
}
}
Date Source Api
import org.apache.spark.sql.SparkSession
object Demo4SourceAPI {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("data source api")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
/**
* 導(dǎo)入隱式轉(zhuǎn)換
*/
/**
* ================================讀寫csv格式的數(shù)據(jù)=========================
*/
//如果是直接調(diào)用csv函數(shù)讀取數(shù)據(jù)的話,無法做表結(jié)構(gòu)的設(shè)置
// val df1: DataFrame = sparkSession.read
// .csv("spark/data/test1.csv")
// //使用format的形式讀取數(shù)據(jù)的同時可以設(shè)置表結(jié)構(gòu)
// val df2: DataFrame = sparkSession.read
// .format("csv")
// .schema("id STRING,name STRING,age INT")
// .load("spark/data/test1.csv")
// df2.show()
// 讀取學(xué)生數(shù)據(jù)
// val df1: DataFrame = sparkSession.read
// .format("csv")
// .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
// .option("sep", ",")
// .load("spark/data/students.txt")
//
// df1.createOrReplaceTempView("students")
//
// val resDF1: DataFrame = sparkSession.sql(
// """
// |select
// |clazz,
// |count(1) as counts
// |from students
// |group by clazz
// |""".stripMargin)
// //以csv格式寫出到磁盤文件夾中
// resDF1.write
// .format("csv")
.option("sep",",")
// .mode(SaveMode.Overwrite)
// .save("spark/data/sqlout4")
/**
* ===================================讀寫json格式的數(shù)據(jù)========================
*/
// val df1: DataFrame = sparkSession.read
// .json("spark/data/students.json")
// 寫數(shù)據(jù)
// df1.groupBy("age")
// .agg(count("age") as "counts")
// .write
// .json("spark/data/sqlout5")
/**
* ================================讀寫parquet格式的數(shù)據(jù)=================
*
* parquet格式的文件存儲,是由【信息熵】決定的
*/
// val df1: DataFrame = sparkSession.read
// .json("spark/data/students2.json")
//
// //以parquet格式寫出去
// df1.write
// .parquet("spark/data/sqlout7")
//讀取parquet格式的數(shù)據(jù)
// val df2: DataFrame = sparkSession.read
// .parquet("spark/data/sqlout7/part-00000-23f5482d-74d5-4569-9bf4-ea0ec91e86dd-c000.snappy.parquet")
// df2.show()
/**
* ======================================讀寫orc格式的數(shù)據(jù)=====================
*文件被壓縮的更小 讀寫速度最快
*/
// val df1: DataFrame = sparkSession.read
// .json("spark/data/students2.json")
// df1.write
// .orc("spark/data/sqlout8")
//
// sparkSession.read
// .orc("spark/data/sqlout8/part-00000-a33e356c-fd1f-4a5e-a87f-1d5b28f6008b-c000.snappy.orc")
// .show()
/**
* ==================================讀寫jdbc格式的數(shù)據(jù)===================
* 需要導(dǎo)入mysql驅(qū)動包
*/
sparkSession.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.option("dbtable", "studentdb.jd_goods")
.option("user", "root")
.option("password", "123456")
.load()
.show(10,truncate = false)
}
}
RDD與DataFrame互相轉(zhuǎn)換
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Demo5RDD2DataFrame {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("rdd與df之間的轉(zhuǎn)換")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
//通過SparkSession獲取sparkContext對象
val sparkContext: SparkContext = sparkSession.sparkContext
//作用1:使用$函數(shù)
//作用2:可以在不同的數(shù)據(jù)結(jié)構(gòu)之間轉(zhuǎn)換
import sparkSession.implicits._
/**
* spark core的核心數(shù)據(jù)結(jié)構(gòu)是:RDD
* spark sql的核心數(shù)據(jù)結(jié)構(gòu)是DataFrame
*/
// RDD->DataFrame .toDF
val linesRDD: RDD[String] = sparkContext.textFile("spark/data/students.txt")
val stuRDD: RDD[(String, String, String, String, String)] = linesRDD.map((line: String) => {
line.split(",") match {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
(id, name, age, gender, clazz)
}
})
val resRDD1: RDD[(String, Int)] = stuRDD.groupBy(_._5)
.map((kv: (String, Iterable[(String, String, String, String, String)])) => {
(kv._1, kv._2.size)
})
val df1: DataFrame = resRDD1.toDF // 轉(zhuǎn)成DF
val df2: DataFrame = df1.select($"_1" as "clazz", $"_2" as "counts")
df2.printSchema()
// DataFrame->RDD .rdd
val resRDD2: RDD[Row] = df2.rdd
// resRDD2.map((row:Row)=>{
// val clazz: String = row.getAs[String]("clazz")
// val counts: Integer = row.getAs[Integer]("counts")
// s"班級:$clazz, 人數(shù):$counts"
// }).foreach(println)
// 模式匹配
resRDD2.map {
case Row(clazz:String, counts:Integer)=>
s"班級:$clazz, 人數(shù):$counts"
}.foreach(println)
}
}
開窗函數(shù)
開窗:over
聚合開窗函數(shù):sum count lag(取上一條) lead(取后一條)排序開窗函數(shù):row_number rank dense_rank
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* 練習(xí)開窗的題目: DSL語法去做
* 統(tǒng)計總分年級排名前十學(xué)生各科的分?jǐn)?shù)
* 統(tǒng)計每科都及格的學(xué)生
* 統(tǒng)計總分大于年級平均分的學(xué)生
* 統(tǒng)計每個班級的每個名次之間的分?jǐn)?shù)差
*/
object Demo6WindowFun {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("rdd與df之間的轉(zhuǎn)換")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
/**
* 導(dǎo)入隱式轉(zhuǎn)換你
*/
import org.apache.spark.sql.functions._
import sparkSession.implicits._
/**
* 讀取三個數(shù)據(jù)文件
*/
val studentsDF: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.txt")
// studentsDF.show()
val scoresDF: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
// scoresDF.show()
val subjectsDF: DataFrame = sparkSession.read
.format("csv")
.schema("subject_id STRING,subject_name STRING,subject_score INT")
.load("spark/data/subject.txt")
// subjectsDF.show()
//統(tǒng)計總分年級排名前十學(xué)生各科的分?jǐn)?shù)
val resDS1: Dataset[Row] = scoresDF
.join(studentsDF, "id")
.withColumn("sumScore", sum("score") over Window.partitionBy("id"))
// dense_rank 不跳過排名 并列
.withColumn("rn", dense_rank() over Window.partitionBy(substring($"clazz", 0, 2)).orderBy($"sumScore".desc))
.where($"rn" <= 10)
.limit(120)
//統(tǒng)計每科都及格的學(xué)生
val resDS2: Dataset[Row] = scoresDF
.join(subjectsDF, "subject_id")
.where($"score" >= $"subject_score" * 0.6)
.withColumn("jigeCount", count(expr("1")) over Window.partitionBy($"id"))
.where($"jigeCount" === 6)
//統(tǒng)計總分大于年級平均分的學(xué)生
val resDS3: Dataset[Row] = scoresDF
.join(studentsDF, "id")
.withColumn("sumScore", sum($"score") over Window.partitionBy($"id"))
.withColumn("avgScore", avg($"sumScore") over Window.partitionBy(substring($"clazz", 0, 2)))
.where($"sumScore" > $"avgScore")
//統(tǒng)計每個班級的每個名次之間的分?jǐn)?shù)差
val resDF4: DataFrame = scoresDF
.join(studentsDF, "id")
.groupBy("id", "clazz")
.agg(sum("score") as "sumScore")
.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))
.withColumn("beforeSumScore", lag($"sumScore", 1, 750) over Window.partitionBy($"clazz").orderBy($"sumScore".desc))
.withColumn("cha", $"beforeSumScore" - $"sumScore")
}
}
DSL練習(xí)
公司代碼,年度,1月-12月的收入金額
burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12
853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990
853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991
853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992
853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993
853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994
853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995
853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996
853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997
853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998
853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999
846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000
846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001
846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573
846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671
846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769
846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866
846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964
846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062
846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159
846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257
1、統(tǒng)計每個公司每年按月累計收入 行轉(zhuǎn)列 --> sum窗口函數(shù)
輸出結(jié)果
公司代碼,年度,月份,當(dāng)月收入,累計收入
2、統(tǒng)計每個公司當(dāng)月比上年同期增長率 行轉(zhuǎn)列 --> lag窗口函數(shù)
公司代碼,年度,月度,增長率(當(dāng)月收入/上年當(dāng)月收入 - 1)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Demo7Burks {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("練習(xí)1需求")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
//導(dǎo)入隱式轉(zhuǎn)換
import org.apache.spark.sql.functions._
import sparkSession.implicits._
// 加載數(shù)據(jù)
val burksDF: DataFrame = sparkSession.read
.format("csv")
.schema("burk STRING,year STRING" +
",tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE" +
",tsl04 DOUBLE,tsl05 DOUBLE,tsl06 DOUBLE" +
",tsl07 DOUBLE,tsl08 DOUBLE,tsl09 DOUBLE" +
",tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")
.load("spark/data/burks.txt")
/**
* 1、統(tǒng)計每個公司每年按月累計收入 行轉(zhuǎn)列 --> sum窗口函數(shù)
*
* 輸出結(jié)果
* 公司代碼,年度,月份,當(dāng)月收入,累計收入
*/
// 純sql的方式實現(xiàn)
burksDF.createOrReplaceTempView("burks")
val resDF1: DataFrame = sparkSession.sql(
"""
|select
|t1.burk as burk,
|t1.year as year,
|t1.month as month,
|t1.tsl as tsl,
|sum(t1.tsl) over(partition by burk,year order by month) as leiji
|from
|(select
| burk,
| year,
| month,
| tsl
|from
| burks
| lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,tsl
| ) t1
|""".stripMargin)
// DSL方法實現(xiàn)
val m: Column = map(
expr("1"), $"tsl01",
expr("2"), $"tsl02",
expr("3"), $"tsl03",
expr("4"), $"tsl04",
expr("5"), $"tsl05",
expr("6"), $"tsl06",
expr("7"), $"tsl07",
expr("8"), $"tsl08",
expr("9"), $"tsl09",
expr("10"), $"tsl10",
expr("11"), $"tsl11",
expr("12"), $"tsl12"
)
// DSL語法方式實現(xiàn)
// burksDF.select($"burk",$"year",explode(m) as Array("month","tsl"))
// .withColumn("leiji",sum($"tsl") over Window.partitionBy($"burk",$"year").orderBy($"month"))
// .show()
/**
* 2、統(tǒng)計每個公司當(dāng)月比上年同期增長率 行轉(zhuǎn)列 --> lag窗口函數(shù)
* 公司代碼,年度,月度,增長率(當(dāng)月收入/上年當(dāng)月收入 - 1)
*
* 853101 2010 1 10000
* 853101 2011 1 11000 10000
*/
val resDF2: DataFrame = burksDF.select($"burk", $"year", explode(m) as Array("month", "tsl"))
.withColumn("beforeTsl", lag($"tsl", 1, 0.0) over Window.partitionBy($"burk", $"month").orderBy($"year"))
.withColumn("p", round(($"tsl" / $"beforeTsl" - 1) * 100, 8).cast("string"))
.withColumn("new_p", when($"p".isNotNull, $"p").otherwise("該年的當(dāng)月是第一次計數(shù)"))
.select($"burk", $"year", $"month", $"tsl", $"new_p")
}
}
集群運行 Spark sql
編寫一個簡單代碼
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo8SubmitYarn {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
// .master("local")
.appName("提交到y(tǒng)arn 計算每個班級的人數(shù)")
//參數(shù)設(shè)置的優(yōu)先級:代碼優(yōu)先級 > 命令優(yōu)先級 > 配置文件優(yōu)先級
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load(args(0)) // hdfs上的路徑 給參數(shù)
val df2: DataFrame = df1.groupBy($"clazz")
.agg(count($"id") as "counts")
df2.show()
df2.write
.csv(args(1))// 帶傳參數(shù)
}
}
打包上傳 上傳數(shù)據(jù)
運行
spark-submit --master yarn --deploy-mode client --class com.shujia.sql.Demo8SubmitYarn --conf spark.sql.shuffle.partitions=1 spark-1.0.jar (數(shù)據(jù)輸入路徑)(輸出路徑)
注意:
在代碼中,我們設(shè)置了分區(qū)數(shù)為1,我們在命令中設(shè)置分區(qū)數(shù)100 看看效果
spark-submit --master yarn --deploy-mode client --class com.shujia.sql.Demo8SubmitYarn --conf spark.sql.shuffle.partitions=100 spark-1.0.jar(數(shù)據(jù)輸入路徑)(輸出路徑)
運行發(fā)現(xiàn) 還是一個分區(qū)。
結(jié)論:參數(shù)設(shè)置的優(yōu)先級:代碼優(yōu)先級 > 命令優(yōu)先級 > 配置文件優(yōu)先級
spark shell (repl) 里面使用sqlContext 測試使用,簡單任務(wù)使用
spark-shell --master yarn --deploy-mode client
可以在這里面編寫代碼
字符串拼接
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo9Test {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("提交到y(tǒng)arn 計算每個班級的人數(shù)")
//參數(shù)設(shè)置的優(yōu)先級:代碼優(yōu)先級 > 命令優(yōu)先級 > 配置文件優(yōu)先級
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.option("sep", ",")
.load("spark/data/students.txt")
// 字符串拼接
df1.select($"name", concat(expr("'姓名: '"),$"name") as "new_str").show()
df1.groupBy($"clazz")
.agg(
count(expr("1")) as "counts",
avg($"age") as "avgAge"
).show()
}
}
spark-sql
進入命令行,和hive的命令行一樣,直接寫sql,默認(rèn)去hive讀數(shù)據(jù)
spark-sql --master yarn --deploy-mode client
spark整合hive
在spark sql中使用hive的元數(shù)據(jù)
spark sql是使用spark進行計算的,hive使用MR進行計算的
1、在hive的hive-site.xml修改一行配置,增加了這一行配置之后,以后在使用hive之前都需要先啟動元數(shù)據(jù)服務(wù)
cd /usr/local/soft/hive-1.2.1/conf/
2、啟動hive元數(shù)據(jù)服務(wù), 將hvie的元數(shù)據(jù)暴露給第三方使用
nohup hive --service metastore >> metastore.log 2>&1 &
3、將hive-site.xml 復(fù)制到spark conf目錄下
cp hive-site.xml /usr/local/soft/spark-3.1.3/conf/
4、 將mysql 驅(qū)動包復(fù)制到spark jars目錄下
cd /usr/local/soft/hive-3.1.2/lib
cp mysql-connector-java-8.0.29.jar /usr/local/soft/spark-3.1.3/jars/
5、整合好之后在spark-sql 里面就可以使用hive的表了
# 啟動hive元數(shù)據(jù)
# 模式是local模式
spark-sql -conf spark.sql.shuffle.partitions=2
# 使用yarn-client模式
spark-sql --master yarn-client --conf spark.sql.shuffle.partitions=1
#在spark-sql中設(shè)置運行參數(shù)
set spark.sql.shuffle.partitions=2;
# 執(zhí)行一些sql...
spark-sql -e
-- 執(zhí)行一條sql語句,執(zhí)行完,自動退出
spark-sql -e "select * from student"
spark-sql -f
vim a.sql
select * from student
-- 執(zhí)行一個sql文件
spark-sql -f a.sql
當(dāng)spark-sql 和hive整合好之后再代碼中也可以直接使用hive的表
導(dǎo)入依賴
準(zhǔn)備工作:將hive的配置文件,hadoop的配置文件 復(fù)制到項目中resources文件夾中:
core-site.xml
hdfs-site.xml
yarn-site.xml
hive-site.xml
import org.apache.spark.sql.SparkSession
object Demo10HiveOnSpark {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("提交到y(tǒng)arn 計算每個班級的人數(shù)")
.config("spark.sql.shuffle.partitions", "1")
.enableHiveSupport() // 開啟hive的配置
.getOrCreate()
sparkSession.sql("use bigdata30")
sparkSession.sql("select * from sqoop_students1 limit 10").show(truncate = false)
}
}
//寫好的代碼不能再本地運行, 需要打包上傳到集群運行
spark sql和hvie的建表語句一樣
create external table students
(
id string,
name string,
age int,
gender string,
clazz string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
location '/bigdata30/spark_in/data/student';
create table score
(
student_id string,
cource_id string,
sco int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS textfile
location '/data/score/';
禁用集群spark日志
cd /usr/local/soft/spark-2.4.5/conf
mv log4j.properties.template log4j.properties
vim log4j.properties
修改配置
log4j.rootCategory=ERROR, console
spark sql和hive區(qū)別
1、spark sql緩存
-- 進入spark sql命令行
spark-sql
-- 可以通過一個網(wǎng)址訪問spark任務(wù)
http://master:4040
-- 設(shè)置并行度
set spark.sql.shuffle.partitions=1;
-- 再spark-sql中對同一個表進行多次查詢的時候可以將表緩存起來
cache table student;
-- 刪除緩存
uncache table student;
-- 再代碼中也可以緩存DF
studentDF.persist(StorageLevel.MEMORY_ONLY)
2、spark sql mapjoin — 廣播變量
Reduce Join
select * from
student as a
join
score as b
on
a.id=b.student_id
MapJoin
當(dāng)一個大表關(guān)聯(lián)小表的時候可以將小表加載到內(nèi)存中進行關(guān)聯(lián)---- 廣播變量
在map端進行表關(guān)聯(lián),不會產(chǎn)生shuffle
select /*+broadcast(a) */ * from
student as a
join
score as b
on
a.id=b.student_id
/*+broadcast(a) */ HINT:給sql加提示的語法
表1 姓名,科目,分?jǐn)?shù) name,item,score 張三,數(shù)學(xué),33 張三,英語,77 李四,數(shù)學(xué),66 李四,英語,78
表2 姓名,數(shù)學(xué),英語 name,math,english 張三,33,77 李四,66,78
1、將表1轉(zhuǎn)化成表2 2、將表2轉(zhuǎn)化成表1
行列轉(zhuǎn)換
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
/**
*
* 1、行列轉(zhuǎn)換
*
*/
object Demo11Student {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("行轉(zhuǎn)列 列轉(zhuǎn)行案例演示")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//當(dāng)你配置了hdfs等一些配置文件,那么默認(rèn)讀取路徑是hadoop的,否則是本地
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("name STRING,item STRING,score INT")
.load("/bigdata30/stu.txt")
//列轉(zhuǎn)行
val resDF: DataFrame = df1.groupBy($"name")
.agg(
sum(when($"item" === "數(shù)學(xué)", $"score").otherwise(0)) as "math",
sum(when($"item" === "英語", $"score").otherwise(0)) as "english"
)
// val array1: Column = array($"math", $"english")
val m: Column = map(
expr("'數(shù)學(xué)'"), $"math",
expr("'英語'"), $"english"
)
//行轉(zhuǎn)列
resDF.select($"name",explode(m) as Array("item","score")).show()
}
}
自定義UDF函數(shù)
sparkSession.udf.register(“hhh”,fun1) // 注冊成函數(shù)
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo12UDF {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("行轉(zhuǎn)列 列轉(zhuǎn)行案例演示")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//當(dāng)你配置了hdfs等一些配置文件,那么默認(rèn)讀取路徑是hadoop的,否則是本地
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.option("sep", ",")
.load("/bigdata30/students.csv")
val fun1: UserDefinedFunction = udf("姓名: " + _)
// 注冊成一張表
df1.createOrReplaceTempView("students")
//將自定義的函數(shù)變量注冊成sql語句中的函數(shù)
sparkSession.udf.register("hhh",fun1) // 取任意名字
sparkSession.sql(
"""
|select
|id,
|name,
|hhh(name) as new_name
|from
|students
|""".stripMargin).show()
}
}
在spark-sql命令行創(chuàng)建:
import org.apache.hadoop.hive.ql.exec.UDF
class Demo13Str extends UDF {
def evaluate(line: String): String = "胡哈哈哈:" + line
}
/**
* 1、自定義類繼承UDF類,重寫evaluate方法
* 2、打包,spark-1.0.jar 將jar包放到spark目錄下的jars目錄下
* 3、在spark-sql命令行中注冊函數(shù)
* create function hhhh as 'com.shujia.sql.Demo13Str'
*
*
* */
DSL練習(xí)(二)
工作經(jīng)歷
數(shù)據(jù):
91330000733796106P,杭州海康威視數(shù)字技術(shù)股份有限公司,2020-02-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2020-03-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2020-04-01 00:00:00 91330000733796106P,杭州海康威視數(shù)字技術(shù)股份有限公司,2020-05-01 00:00:00 91330000733796106P,阿里云計算有限公司,2020-06-01 00:00:00 91330000733796106P,阿里云計算有限公司,2020-07-01 00:00:00 91330000733796106P,阿里云計算有限公司,2020-08-01 00:00:00 91330000733796106P,阿里云計算有限公司,2020-09-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2020-10-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2020-11-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2020-12-01 00:00:00 91330000733796106P,杭州海康威視數(shù)字技術(shù)股份有限公司,2021-01-01 00:00:00 91330000733796106P,杭州??低晹?shù)字技術(shù)股份有限公司,2021-02-01 00:00:00 91330000733796106P,杭州海康威視數(shù)字技術(shù)股份有限公司,2021-03-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-02-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-03-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-04-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-05-01 00:00:00 aaaaaaaaaaaaaaaaaa,阿里云計算有限公司,2020-06-01 00:00:00 aaaaaaaaaaaaaaaaaa,阿里云計算有限公司,2020-07-01 00:00:00 aaaaaaaaaaaaaaaaaa,阿里云計算有限公司,2020-08-01 00:00:00 aaaaaaaaaaaaaaaaaa,阿里云計算有限公司,2020-09-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-10-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州海康威視數(shù)字技術(shù)股份有限公司,2020-11-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2020-12-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2021-01-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2021-02-01 00:00:00 aaaaaaaaaaaaaaaaaa,杭州??低晹?shù)字技術(shù)股份有限公司,2021-03-01 00:00:00
需求:統(tǒng)計每個員工的工作經(jīng)歷
結(jié)果結(jié)構(gòu):
員工編號,開始時間,結(jié)束時間,公司名稱
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo14SheBao {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("經(jīng)歷練習(xí)")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val df1: DataFrame = sparkSession.read
.format("csv")
.schema("id STRING,burk STRING,sdate STRING")
.load("/bigdata30/shebao.txt")
val resDF: DataFrame = df1.withColumn("before_burk", lag($"burk", 1) over Window.partitionBy($"id").orderBy($"sdate"))
.select(
$"id",
$"burk",
$"sdate",
when($"before_burk".isNull, $"burk").otherwise($"before_burk") as "before_burk"
).withColumn("flag", when($"burk" === $"before_burk", 0).otherwise(1))
.withColumn("tmp", sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))
.groupBy($"id", $"burk", $"tmp")
.agg(
min($"sdate") as "start_date",
max($"sdate") as "end_date"
).select($"id", $"burk", $"start_date", $"end_date")
// 保存結(jié)果
resDF.write
.format("csv")
.mode(SaveMode.Overwrite)
.save("/bigdata30/spark_out4")
}
}
螞蟻森林植物申領(lǐng)統(tǒng)計
table_name:user_low_carbon
字段名字段描述user_id用戶data_dt日期low_carbon減少碳排放(g)
螞蟻森林植物換購表,用于記錄申領(lǐng)環(huán)保植物所需要減少的碳排放量
table_name: plant_carbon
字段名字段描述plant_id植物編號plant_name植物名plant_carbon換購植物所需要的碳
題目一
螞蟻森林植物申領(lǐng)統(tǒng)計 問題:假設(shè)2017年1月1日開始記錄低碳數(shù)據(jù)(user_low_carbon),假設(shè)2017年10月1日之前滿足申領(lǐng)條件的用戶都申領(lǐng)了一顆p004-胡楊, 剩余的能量全部用來領(lǐng)取“p002-沙柳” 。 統(tǒng)計在10月1日累計申領(lǐng)“p002-沙柳” 排名前10的用戶信息;以及他比后一名多領(lǐng)了幾顆沙柳。 得到的統(tǒng)計結(jié)果如下表樣式:
user_id plant_count less_count(比后一名多領(lǐng)了幾顆沙柳)
u_101 1000 100
u_088 900 400
u_103 500 …
題目二
螞蟻森林低碳用戶排名分析 問題:查詢user_low_carbon表中每日流水記錄,條件為: 用戶在2017年,連續(xù)三天(或以上)的天數(shù)里, 每天減少碳排放(low_carbon)都超過100g的用戶低碳流水。 需要查詢返回滿足以上條件的user_low_carbon表中的記錄流水。 例如用戶u_002符合條件的記錄如下,因為2017/1/2~2017/1/5連續(xù)四天的碳排放量之和都大于等于100g:
user_id data_dt low_carbon
u_002 2017/1/2 150
u_002 2017/1/2 70
u_002 2017/1/3 30
u_002 2017/1/3 80
u_002 2017/1/4 150
u_002 2017/1/5 101
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Demo15MaYi {
def main(args: Array[String]): Unit = {
/**
* 創(chuàng)建SparkSession的環(huán)境對象
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("螞蟻森林案例")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//讀取用戶每日碳排放量信息表
val userLowCarbonDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("user_id STRING,date_dt STRING,low_carbon Double")
.load("spark/data/ant_user_low_carbon.txt")
val plantCarbonDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("plant_id STRING,plant_name STRING,plant_carbon Double")
.load("spark/data/ant_plant_carbon.txt")
//因為用戶信息表與植物信息表是沒有直接關(guān)聯(lián)條件的,需要單獨的從植物信息表中將胡楊和沙柳的所需能量提取出來由變量保存
val huYangCarbon: Double = plantCarbonDF.where($"plant_name" === "胡楊")
.select($"plant_carbon")
.rdd
.collect()
.head
.getAs[Double]("plant_carbon")
val shaLiuCarbon: Double = plantCarbonDF.where($"plant_name" === "沙柳")
.select($"plant_carbon")
.rdd
.collect()
.head
.getAs[Double]("plant_carbon")
println(s"胡楊所需碳排放量:$huYangCarbon, 沙柳所需碳排放量:$shaLiuCarbon")
println("==========================================================================")
/**
* 題目一:螞蟻森林植物申領(lǐng)統(tǒng)計
* 假設(shè)2017年1月1日開始記錄低碳數(shù)據(jù)(user_low_carbon),假設(shè)2017年10月1日之前滿足申領(lǐng)條件的用戶都申領(lǐng)了一顆p004-胡楊,
* 剩余的能量全部用來領(lǐng)取“p002-沙柳” 。
* 統(tǒng)計在10月1日累計申領(lǐng)“p002-沙柳” 排名前10的用戶信息;以及他比后一名多領(lǐng)了幾顆沙柳。
* 得到的統(tǒng)計結(jié)果如下表樣式:
*/
//過濾日期是2017年1月1日到2017年10月1日之間的
userLowCarbonDF.where($"date_dt" >= "2017/1/1" and $"date_dt" <= "2017/10/1")//.show()
//根據(jù)用戶,日期分組,聚合每一天總的排放量
.groupBy($"user_id")
.agg(sum($"low_carbon") as "low_carbon")//.show()
//新增一列,表示申領(lǐng)條件后的剩余能量
.withColumn("other_carbon",when($"low_carbon" >= huYangCarbon,$"low_carbon" - huYangCarbon).otherwise($"low_carbon"))//.show()
//新增一列,計算領(lǐng)取沙柳的棵樹
.withColumn("plant_count",floor($"other_carbon" / shaLiuCarbon))//.show()
//新增一列,取出后一個沙柳的棵樹
.withColumn("after_plant_count",lead($"plant_count",1,0) over Window.orderBy($"plant_count".desc))
.withColumn("less_count",$"plant_count" - $"after_plant_count")
.limit(10)
.select($"user_id",$"plant_count",$"less_count")
// .show()
/**
* 題目二:螞蟻森林低碳用戶排名分析
* 查詢user_low_carbon表中每日流水記錄,條件為:
* 用戶在2017年,連續(xù)三天(或以上)的天數(shù)里,
* 每天減少碳排放(low_carbon)都超過100g的用戶低碳流水。
* 需要查詢返回滿足以上條件的user_low_carbon表中的記錄流水。
* 例如用戶u_002符合條件的記錄如下,因為2017/1/2~2017/1/5連續(xù)四天的碳排放量之和都大于等于100g:
*/
//根據(jù)用戶和日期進行分組,得到每一天碳排放量
userLowCarbonDF.groupBy($"user_id",$"date_dt")
.agg(sum($"low_carbon") as "day_carbon")
//過濾出大于100碳排放量的天
.where($"day_carbon" > 100)
//根據(jù)用戶開窗,以日期升序排序
.withColumn("rn",row_number() over Window.partitionBy($"user_id").orderBy($"date_dt"))
//將日期減去編號,根據(jù)結(jié)果判斷天數(shù)是否連續(xù)
.withColumn("tmp_date",date_sub(regexp_replace($"date_dt","/","-"),$"rn"))
//新增一列,計算用戶連續(xù)的天數(shù)
.withColumn("days",count(expr("1")) over Window.partitionBy($"user_id",$"tmp_date"))
//過濾出連續(xù)天數(shù)是大于3的
.where($"days" >= 3)
.select($"user_id",$"date_dt")
.join(userLowCarbonDF,List("user_id","date_dt"))
.select($"user_id",$"date_dt",$"low_carbon")
.show(1000)
}
}
Spark streaming
通過wordcount 認(rèn)識spark streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
object Demo1WordCount {
/**
* Spark core: SparkContext 核心數(shù)據(jù)結(jié)構(gòu):RDD
* Spark sql: SparkSession 核心數(shù)據(jù)結(jié)構(gòu):DataFrame
* Spark streaming: StreamingContext 核心數(shù)據(jù)結(jié)構(gòu):DStream(底層封裝了RDD)
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wordCount")
conf.setMaster("local[2]") // 給定核數(shù)
val context = new SparkContext(conf)
//創(chuàng)建Spark Streaming的運行環(huán)境,和前兩個模塊是不一樣的
//Spark Streaming是依賴于Spark core的環(huán)境的
//this(sparkContext: SparkContext, batchDuration: Duration)
//Spark Streaming處理之前,是有一個接收數(shù)據(jù)的過程
//batchDuration,表示接收多少時間段內(nèi)的數(shù)據(jù)
val streamingContext = new StreamingContext(context, Durations.seconds(5)) // 傳入接收時間
//Spark Streaming程序理論上是一旦啟動,就不會停止,除非報錯,人為停止,停電等其他突然場景導(dǎo)致程序終止
// 監(jiān)控一個端口號中的數(shù)據(jù),手動向端口號中打數(shù)據(jù)
// 模擬kafka
val rids: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
// 對接收的數(shù)據(jù)進行處理
val resDS: DStream[(String, Int)] = rids
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
resDS.print()
/**
* sparkStreaming啟動的方式和前兩個模塊啟動方式不一樣
*/
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
如何將上一次處理的結(jié)果保留下來?
需要使用有狀態(tài)的算子來處理當(dāng)前批次數(shù)據(jù)與歷史數(shù)據(jù)的關(guān)系 * updateStateByKey(S:ClassTag)(updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
Seq: 序列,表示歷史鍵對應(yīng)的值組成的序列 (hello, seq:[1,1,1])Option: 當(dāng)前批次輸入鍵對應(yīng)的value值,如果歷史中沒有該鍵,這個值就是None, 如果歷史中出現(xiàn)了這個鍵,這個值就是Some(值)有狀態(tài)算子使用注意事項:1、有狀態(tài)算子ByKey算子只適用于k-v類型的DStream2、有狀態(tài)算子使用的時候,需要提前設(shè)置checkpoint的路徑,因為需要將歷史批次的結(jié)果存儲下來
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
object Demo2WordCount2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]") // 給定核數(shù)
conf.setAppName("spark Streaming 單詞統(tǒng)計")
val sparkContext = new SparkContext(conf)
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
//設(shè)置的是一個文件夾 存儲歷史數(shù)據(jù)
streamingContext.checkpoint("spark/data/checkpoint2")
val rids: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
//hello world
val wordsDS: DStream[String] = rids.flatMap(_.split(" "))
val kvDS: DStream[(String, Int)] = wordsDS.map((_, 1)) // (hello,1) (hello,1) (hello,1)
/**
* 每5秒中resDS中的數(shù)據(jù),是當(dāng)前5s內(nèi)的數(shù)據(jù)
* reduceByKey,只會對當(dāng)前5s批次中的數(shù)據(jù)求和
*/
// val resDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)
val resDS: DStream[(String, Int)] = kvDS.updateStateByKey((seq1: Seq[Int], opt1: Option[Int]) => {
// 上一次的總和
val sumValue: Int = seq1.sum
// 這一次的
val num: Int = opt1.getOrElse(0)
Option(sumValue + num)
})
println("--------------------------------------")
resDS.print()
println("--------------------------------------")
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
窗口
滑動窗口和滾動窗口
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo3Window {
def main(args: Array[String]): Unit = {
/**
* 創(chuàng)建spark streaming的環(huán)境
* 舊版本創(chuàng)建的方式
*/
// val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("窗口案例")
// val context = new SparkContext(conf)
// val sc = new StreamingContext(context, Durations.seconds(5))
/**
* 新版本的創(chuàng)建方式
*/
val context: SparkContext = SparkSession.builder()
.master("local[2]")
.appName("窗口案例")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate().sparkContext
val sc = new StreamingContext(context, Durations.seconds(3)) // 正常每次接收5s內(nèi)的數(shù)據(jù)
//1000 ~ 65535 端口號
val infoDS: ReceiverInputDStream[String] = sc.socketTextStream("master", 10086)
val wordsDS: DStream[String] = infoDS.flatMap(_.split(" "))
val kvDS: DStream[(String, Int)] = wordsDS.map((_, 1))
/**
* 1、如果只是為了計算當(dāng)前批次接收的數(shù)據(jù),直接調(diào)用reduceByKey
* 2、如果要將最新批次的數(shù)據(jù)與歷史數(shù)據(jù)結(jié)合處理的話,需要調(diào)用有狀態(tài)算子 updateStateByKey
* 3、如果要實現(xiàn)滑動窗口或者滾動窗口的話,需要使用窗口類算子reduceByKeyAndWindow
*/
//def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration): DStream[(K, V)]
//reduceFunc 編寫處理相同的鍵對應(yīng)的value值做處理
//windowDuration 設(shè)置窗口的大小
//slideDuration 設(shè)置滑動的大小
//每間隔slideDuration大小的時間計算一次數(shù)據(jù),計算數(shù)據(jù)的范圍是最近windowDuration大小時間的數(shù)據(jù)
val resDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(12), Durations.seconds(6))
/**
* 當(dāng)窗口大小與滑動大小一致的時候,那么就會從滑動窗口轉(zhuǎn)變成滾動窗口的效果
*/
// val resDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(10), Durations.seconds(10))
resDS.print()
sc.start()
sc.awaitTermination()
sc.stop()
}
}
各數(shù)據(jù)類型之間的關(guān)系
foreachRDD
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo4DStream2RDD {
def main(args: Array[String]): Unit = {
//使用DataFrame的語法
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("rdd與DStream的關(guān)系")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//使用RDD的語法
val sparkContext: SparkContext = sparkSession.sparkContext
//使用DStream的語法
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val infoDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 10086)
//如果DS不是鍵值形式的話,可以單獨調(diào)用window函數(shù)進行設(shè)置窗口的形式
val new_infoDS: DStream[String] = infoDS.window(Durations.seconds(10), Durations.seconds(5))
// hello world java hello java
/**
* foreachRDD:在DS中使用rdd的語法操作數(shù)據(jù)
* 缺點:該函數(shù)是沒有返回值的
* 需求:我們在想使用DS中的RDD的同時,想要使用結(jié)束后,會得到一個新的DS
*/
new_infoDS.foreachRDD((rdd:RDD[String])=>{
println("------------------------------")
// val resRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))
// .map((_, 1))
// .reduceByKey(_ + _)
// resRDD.foreach(println)
//rdd和df之間可以轉(zhuǎn)換 使用RDD的方式處理
val df1: DataFrame = rdd.toDF.select($"value" as "info")
df1.createOrReplaceTempView("words")
val resDF: DataFrame = sparkSession.sql(
"""
|select
|t1.wds as word,
|count(1) as counts
|from
|(
|select
|explode(split(info,' ')) as wds
|from words) t1
|group by t1.wds
|""".stripMargin)
resDF.show()
})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
transform
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* 面試題:foreachRDD與transform的區(qū)別
*/
object Demo5TransFormat {
def main(args: Array[String]): Unit = {
//使用DataFrame的語法
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("rdd與DStream的關(guān)系")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//使用RDD的語法
val sparkContext: SparkContext = sparkSession.sparkContext
//使用DStream的語法
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val infoDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 10086)
val resDS: DStream[(String, Int)] = infoDS.transform((rdd: RDD[String]) => {
//直接對rdd進行處理,返回新的rdd
// val resRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))
// .map((_, 1))
// .reduceByKey(_ + _)
// resRDD
//將rdd轉(zhuǎn)df,使用sql做分析
//rdd和df之間可以轉(zhuǎn)換
val df1: DataFrame = rdd.toDF.select($"value" as "info")
df1.createOrReplaceTempView("words")
val resDF: DataFrame = sparkSession.sql(
"""
|select
|t1.wds as word,
|count(1) as counts
|from
|(
|select
|explode(split(info,' ')) as wds
|from words) t1
|group by t1.wds
|""".stripMargin)
val resRDD: RDD[(String, Int)] = resDF.rdd.map((row: Row) => (row.getAs[String](0), row.getAs[Int](1)))
resRDD
})
resDS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
區(qū)別:
transform: 將Dstream的操作轉(zhuǎn)化為RDD的操作,返回的是一個新的RDD
foreach: 將Dstream的操作轉(zhuǎn)化為RDD的操作,沒有返回值 ,直接在函數(shù)中操作
yarn提交作業(yè)
打包代碼 上傳
spark-submit --master yarn --deploy-mode client --class com.shujia.streaming.Demo6YarnSubmiti spark-1.0.jar --num-executors 2 --executor-cores 1
spark streaming保存文件到本地
//將結(jié)果存儲到磁盤中
//只能設(shè)置文件夾的名字和文件的后綴
//每一批次運行,都會產(chǎn)生新的小文件夾,文件夾中有結(jié)果數(shù)據(jù)文件
resDS.saveAsTextFiles("spark/data/streamout/stream","txt")
拓展:將數(shù)據(jù)保存到數(shù)據(jù)庫中
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import java.sql.{Connection, DriverManager, PreparedStatement}
object Demo8DS2Mysql {
def main(args: Array[String]): Unit = {
//使用DataFrame的語法
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("rdd與DStream的關(guān)系")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//使用RDD的語法
val sparkContext: SparkContext = sparkSession.sparkContext
//使用DStream的語法
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val infoDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 10086)
infoDS.foreachRDD((rdd:RDD[String])=>{
println("======================= 正在處理一批數(shù)據(jù) ==========================")
//處理rdd中每一條數(shù)據(jù)
rdd.foreach((line:String)=>{
//如果將創(chuàng)建連接的代碼寫在這里,這樣的話,每條數(shù)據(jù)都會創(chuàng)建一次連接
/**
* 創(chuàng)建與數(shù)據(jù)庫連接對象
*/
//注冊驅(qū)動
Class.forName("com.mysql.jdbc.Driver")
//創(chuàng)建數(shù)據(jù)庫連接對象
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://master:3306/bigdata30?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"root",
"123456"
)
//創(chuàng)建預(yù)編譯對象
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
val info: Array[String] = line.split(",")
statement.setInt(1,info(0).toInt)
statement.setString(2,info(1))
statement.setInt(3,info(2).toInt)
statement.setString(4,info(3))
statement.setString(5,info(4))
//執(zhí)行sql語句
statement.executeUpdate()
//釋放資源
statement.close()
conn.close()
})
})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
但是這樣做每一條數(shù)據(jù)處理都會創(chuàng)建一次連接,浪費資源: 嘗試改造
/**
* 設(shè)想中的改造,不一定能運行
* 我們將原本在rdd中創(chuàng)建連接的代碼放到了ds中,發(fā)現(xiàn)PreparedStatement不能與task任務(wù)一起序列化到executor中的
* 這樣的寫法是不可以的!!?。。。?!
*/
// infoDS.foreachRDD((rdd: RDD[String]) => {
// println("======================= 正在處理一批數(shù)據(jù) ==========================")
// //如果將創(chuàng)建連接的代碼寫在這里,這樣的話,每條數(shù)據(jù)都會創(chuàng)建一次連接
// /**
// * 創(chuàng)建與數(shù)據(jù)庫連接對象
// */
// //注冊驅(qū)動
// Class.forName("com.mysql.jdbc.Driver")
// //創(chuàng)建數(shù)據(jù)庫連接對象
// val conn: Connection = DriverManager.getConnection(
// "jdbc:mysql://master:3306/bigdata30?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
// "root",
// "123456"
// )
// //創(chuàng)建預(yù)編譯對象
// val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
// //處理rdd中每一條數(shù)據(jù)
// rdd.foreach((line: String) => {
// val info: Array[String] = line.split(",")
// statement.setInt(1, info(0).toInt)
// statement.setString(2, info(1))
// statement.setInt(3, info(2).toInt)
// statement.setString(4, info(3))
// statement.setString(5, info(4))
// //執(zhí)行sql語句
// statement.executeUpdate()
// })
//
// //釋放資源
// statement.close()
// conn.close()
//
// })
最終版本:foreachPartition算子
/**
* rdd中有一個算子foreachPartition
* rdd本質(zhì)是由一系列分區(qū)構(gòu)成的,如果我們可以將分區(qū)數(shù)設(shè)置為1,且每個分區(qū)創(chuàng)建一個連接不就好了么
*/
infoDS.foreachRDD((rdd: RDD[String]) => {
println("======================= 接收到 5s 一批次數(shù)據(jù) ==========================")
rdd.repartition(2)
println(s" DS封裝的RDD中的分區(qū)數(shù)為:${rdd.getNumPartitions} ")
/**
* foreachPartition,處理一個分區(qū)的數(shù)據(jù)
* 將一個分區(qū)的數(shù)據(jù),封裝成了一個迭代器
*/
rdd.foreachPartition((itr: Iterator[String]) => {
println("======================= 正在處理一個分區(qū)的數(shù)據(jù) ==========================")
/**
* 創(chuàng)建與數(shù)據(jù)庫連接對象
*/
//注冊驅(qū)動
Class.forName("com.mysql.jdbc.Driver")
//創(chuàng)建數(shù)據(jù)庫連接對象
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://master:3306/bigdata30?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"root",
"123456"
)
//創(chuàng)建預(yù)編譯對象
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
println("========================= 創(chuàng)建了一次連接 =========================")
itr.foreach((line: String) => {
val info: Array[String] = line.split(",")
statement.setInt(1, info(0).toInt)
statement.setString(2, info(1))
statement.setInt(3, info(2).toInt)
statement.setString(4, info(3))
statement.setString(5, info(4))
//執(zhí)行sql語句
statement.executeUpdate()
})
statement.close()
conn.close()
})
})
Spark 優(yōu)化
spark 調(diào)優(yōu)
避免創(chuàng)建重復(fù)的RDD
盡可能復(fù)用同一個RDD
對多次使用的RDD進行持久化 ?
盡量避免使用shuffle類算子
使用map-side預(yù)聚合的shuffle操作
使用高性能的算子 ?
廣播大變量?
使用Kryo優(yōu)化序列化性能
優(yōu)化數(shù)據(jù)結(jié)構(gòu)使用高性能的庫fastutil
對多次使用的RDD進行持久化
默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大, 可以綽綽有余地存放下整個RDD的所有數(shù)據(jù)。因為不進行序列化與反序列化操作,就避 免了這部分的性能開銷;對這個RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作 ,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳 送到其他節(jié)點上。但是這里必須要注意的是,在實際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種 策略的場景還是有限的,如果RDD中數(shù)據(jù)比較多時(比如幾十億),直接用這種持久化 級別,會導(dǎo)致JVM的OOM內(nèi)存溢出異常。
如果使用MEMORY_ONLY級別時發(fā)生了內(nèi)存溢出(OOM),那么建議嘗試使用 MEMORY_ONLY_SER級別。該級別會將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時每個 partition僅僅是一個字節(jié)數(shù)組而已,大大減少了對象數(shù)量,并降低了內(nèi)存占用。這種級別 比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算 子可以基于純內(nèi)存進行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問題同上, 如果RDD中的數(shù)據(jù)量過多的話,還是可能會導(dǎo)致OOM內(nèi)存溢出的異常。
如果純內(nèi)存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數(shù)據(jù)量很大,內(nèi)存無 法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時該策略會優(yōu) 先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會寫入磁盤。
通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數(shù)據(jù)的讀寫 ,會導(dǎo)致性能急劇降低,有時還不如重新計算一次所有RDD。后綴為_2的級別,必須將 所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性 能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。
使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map Transformation算子
使用foreachPartitions替代foreach Action算子
使用filter之后進行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition與sort類操作代碼
repartition:coalesce(numPartitions,true) 增多分區(qū)使用這個
coalesce(numPartitions,false) 減少分區(qū) 沒有shuffle只是合并 partition
aggregateByKey
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Demo2AggregateByKey {
def main(args: Array[String]): Unit = {
//使用reduceByKey/aggregateByKey替代groupByKey
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.shuffle.partitions", "1")
.master("local[2]")
.appName("緩存優(yōu)化")
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val stuRDD: RDD[String] = sparkContext.textFile("spark/data/students.txt")
val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(_.split(",") match {
case Array(_, _, _, _, clazz: String) =>
(clazz, 1)
})
//reduceByKey的使用,分組之后,直接使用聚合
//理解為普通的MapReduce中的根據(jù)相同的鍵進入到同一個reduce, 然后在reduce端聚合
//實際上這里對應(yīng)的是前一個RDD中的分區(qū)中數(shù)據(jù)相同的鍵到后一個RDD中同一個分區(qū),在后一個RDD分區(qū)中的聚合
// val resRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey(_ + _)
// resRDD.foreach(println)
//groupByKey 不做聚合,只做前一個RDD中的分區(qū)中數(shù)據(jù)相同的鍵到后一個RDD中同一個分區(qū) (盡量使用reduceByKey去替代)
// val resRDD2: RDD[(String, Int)] = clazzKVRDD.groupByKey()
// .map((kv: (String, Iterable[Int])) => {
// (kv._1, kv._2.sum)
// })
// resRDD2.foreach(println)
//aggregateByKey
//aggregateByKey(zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
//zeroValue: 初始值,這個參數(shù)只會被后面第一個參數(shù)函數(shù)所使用
//seqOp: 相當(dāng)于map端預(yù)聚合的邏輯
//combOp: 相當(dāng)于reduce端的聚合邏輯
val resRDD3: RDD[(String, Int)] = clazzKVRDD.aggregateByKey(0)(
//相當(dāng)于map端預(yù)聚合的邏輯
(a1: Int, a2: Int) => a1 + a2,
//相當(dāng)于reduce端的聚合邏輯
(b1: Int, b2: Int) => b1 + b2
)
resRDD3.foreach(println)
}
}
mapPartitions
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.Date
object Demo3MapPartitions {
def main(args: Array[String]): Unit = {
//使用mapPartitions替代普通map Transformation算子
//使用reduceByKey/aggregateByKey替代groupByKey
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.shuffle.partitions", "1")
.master("local[2]")
.appName("緩存優(yōu)化")
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val lineRDD: RDD[String] = sparkContext.textFile("spark/data/ant_user_low_carbon.txt")
println(s"lineRDD的分區(qū)數(shù):${lineRDD.getNumPartitions}")
/**
* map算子主要作用是,遍歷RDD中的每一條數(shù)據(jù),進行處理返回新的一條數(shù)據(jù)
* 如果在處理過程中,需要創(chuàng)建工具對象的話,那么使用map不太好,原因是因為每一條數(shù)據(jù)都需要new一下
* 可能會造成內(nèi)存溢出
*/
// val resRDD: RDD[(String, String, String)] = lineRDD.map((line: String) => {
// println("===================創(chuàng)建一次對象=============================")// 每次都會創(chuàng)建
// val info: Array[String] = line.split("\t")
// val t1: String = info(1)
// val sdf = new SimpleDateFormat("yyyy/MM/dd")
// val date: Date = sdf.parse(t1)
// val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// val t2: String = sdf2.format(date)
// (info(0), t2, info(2))
// })
// resRDD.foreach(println)
/**
* 實際上針對上面的案例,我們可以針對rdd中的每一個分區(qū)創(chuàng)建一個工具對象,在每條數(shù)據(jù)上使用
* mapPartitions,將每一個分區(qū)中的數(shù)據(jù)封裝成了一個迭代器
*/
val resRDD: RDD[(String, String, String)] = lineRDD.mapPartitions((itr: Iterator[String]) => {
println("===================創(chuàng)建一次對象=============================") // 只會創(chuàng)建兩次
val sdf = new SimpleDateFormat("yyyy/MM/dd")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
itr.map((line: String) => {
val info: Array[String] = line.split("\t")
val t1: String = info(1)
val date: Date = sdf.parse(t1)
val t2: String = sdf2.format(date)
(info(0), t2, info(2))
})
})
resRDD.foreach(println)
}
}
Repartition和Coalesce區(qū)別
關(guān)系:兩者都是用來改變RDD的partition數(shù)量的,repartition底層調(diào)用的就是coalesce方法:coalesce(numPartitions, shuffle = true) 區(qū)別:repartition一定會發(fā)生shuffle,coalesce根據(jù)傳入的參數(shù)來判斷是否發(fā)生shuffle,一般情況下增大rdd的partition數(shù)量使用repartition,減少partition數(shù)量時使用coalesce
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo4Coalesce1 {
def main(args: Array[String]): Unit = {
//repartition:coalesce(numPartitions,true) 增多分區(qū)使用這個
//coalesce(numPartitions,false) 減少分區(qū) 沒有shuffle只是合并 partition
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("重分區(qū)")
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
println(s"lineRDD的分區(qū)數(shù):${lineRDD.getNumPartitions}")
/**
* 使用repartition
*/
//增大分區(qū)數(shù),使用repartition,返回一個新的rdd,會產(chǎn)生shuffle
// val resRDD1: RDD[String] = lineRDD.repartition(10)
// println(s"resRDD1的分區(qū)數(shù):${resRDD1.getNumPartitions}")
// resRDD1.foreach(println)
//減少分區(qū)數(shù),使用repartition,返回一個新的rdd,會產(chǎn)生shuffle
// val resRDD2: RDD[String] = resRDD1.repartition(1)
// println(s"resRDD2的分區(qū)數(shù):${resRDD2.getNumPartitions}")
// resRDD2.foreach(println)
/**
* coalesce
*
* 1、默認(rèn)增大分區(qū)是不會產(chǎn)生shuffle的 如果想要,加上參數(shù)shuffle = true
* 2、合并分區(qū)直接給分區(qū)數(shù),不會產(chǎn)生shuffle
*/
val resRDD1: RDD[String] = lineRDD.coalesce(10, shuffle = true)
println(s"resRDD1的分區(qū)數(shù):${resRDD1.getNumPartitions}")
// resRDD1.foreach(println)
val resRDD2: RDD[String] = resRDD1.coalesce(1,shuffle = true)
println(s"resRDD2的分區(qū)數(shù):${resRDD2.getNumPartitions}")
resRDD2.foreach(println)
while (true) {
// 查看DAG
}
}
}
合并小文件時:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* 當(dāng)資源充足的情況下,可以適當(dāng)?shù)氖褂弥胤謪^(qū)算子,擴大分區(qū)數(shù)
* 當(dāng)資源不足的情況下,可以適當(dāng)?shù)臏p少分區(qū)數(shù)
*
* 分區(qū)數(shù)會影響rdd的并行任務(wù)數(shù)
*/
object Demo5Coalesce2 {
def main(args: Array[String]): Unit = {
//repartition:coalesce(numPartitions,true) 增多分區(qū)使用這個
//coalesce(numPartitions,false) 減少分區(qū) 沒有shuffle只是合并 partition
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("重分區(qū)")
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/test1/*")
println(s"lineRDD的分區(qū)數(shù):${lineRDD.getNumPartitions}")
/**
* Coalesce算子通常是用在合并小文件時候使用
* 對應(yīng)的spark core中的話,通常使用該算子進行合并分區(qū)
*/
val lineRDD2: RDD[String] = lineRDD.coalesce(1)
lineRDD2.foreach(println)
}
}
廣播大變量
如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播 后的變量,會保證每個Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的 task執(zhí)行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本 的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對Executor內(nèi)存的占用開銷,降低 GC的頻率
廣播大變量發(fā)送方式:Executor一開始并沒有廣播變量,而是task運行需要用到廣 播變量,會找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要。
package com.shujia.opt
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo6Join {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("spark sql使用廣播變量")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val studentsDF: DataFrame = sparkSession.read
.format("csv")
.option("seq", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.txt")
val scoresDF: DataFrame = sparkSession.read
.format("csv")
.option("seq", ",")
.schema("id STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
/**
* 如果在spark sql中是兩個DF進行join關(guān)聯(lián)的話,并且運行模式是local模式的話,會自動地將關(guān)聯(lián)的DF進行廣播
* 如果不是local模式,不會自動進行,需要手動將要廣播的DF給廣播出去
*
* 廣播大變量,1G的變量
*
* hint
* 會進行兩次job作業(yè)
* 第一次是將關(guān)聯(lián)的DF廣播
* 第二次是使用廣播的DF進行關(guān)聯(lián)
*/
val resDF: DataFrame = scoresDF.join(studentsDF.hint("broadcast"), "id")
resDF.show()
while (true){
}
}
}
使用Kryo優(yōu)化序列化性能
序列化:
在Spark中,主要有三個地方涉及到了序列化
在算子函數(shù)中使用到外部變量時,該變量會被序列化后進行網(wǎng)絡(luò)傳輸將自定義的類型作為RDD的泛型類型時(比如JavaRDD,SXT是自定義類型),所有自 定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現(xiàn) Serializable接口。使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個 partition都序列化成一個大的字節(jié)數(shù)組。
Kryo序列化器介紹:
park支持使用Kryo序列化機制。Kryo序列化機制,比默認(rèn)的Java序列化機制,速度要快 ,序列化后的數(shù)據(jù)要更小,大概是Java序列化機制的1/10。所以Kryo序列化優(yōu)化以后,可 以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少;在集群中耗費的內(nèi)存資源大大減少。
對于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫,來優(yōu)化序列化和 反序列化的性能。Spark默認(rèn)使用的是Java的序列化機制,也就是 ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同 時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。 官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認(rèn)沒有 使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類 型,因此對于開發(fā)者來說,這種方式比較麻煩
自定義一個序列化類:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class Demo8Kryo extends KryoRegistrator {
// 實現(xiàn)它的方法
override def registerClasses(kryo: Kryo): Unit = {
//告訴spark程序,使用kryo序列化,具體是什么要進行kryo序列化
kryo.register(classOf[Student])
// kryo.register(classOf[Teacher])
}
}
使用kryo序列化:
.config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.config(“spark.kryo.registrator”, “自定義類”)
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object Demo7Kryo {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.shuffle.partitions", "1")
//將序列化方式設(shè)置為Kryo的序列化方式
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//自定義一個序列化類,指定要序列化的東西
.config("spark.kryo.registrator", "com.shujia.opt.Demo8Kryo")
.master("local[2]")
.appName("緩存優(yōu)化")
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[Student] = sparkContext.textFile("spark/data/students.txt")
.map(_.split(",") match {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
Student(id, name, age.toInt, gender, clazz)
})
/**
* 第二次job作業(yè)使用的數(shù)據(jù)大小
* 未使用序列化進行緩存:238.3 KiB
* 使用是默認(rèn)的序列化方式:65.4 KiB
* 使用kryo序列化:43.0 KiB
*/
// studentsRDD.cache() // 默認(rèn)的緩存級別是MEMORY_ONLY
studentsRDD.persist(StorageLevel.MEMORY_ONLY_SER)
/**
* 計算每個班級的人數(shù)
*/
val resRDD: RDD[(String, Int)] = studentsRDD.map((stu:Student)=>(stu.clazz,1)).reduceByKey(_ + _)
resRDD.foreach(println)
/**
* 計算每個性別的人數(shù)
*/
val resRDD2: RDD[(String, Int)] = studentsRDD.map((stu:Student)=>(stu.gender,1)).reduceByKey(_ + _)
resRDD2.foreach(println)
while (true) {
// 查看DAG 等信息
}
}
}
case class Student(id:String,name:String,age:Int,gender:String,clazz:String)// 樣例類
優(yōu)化數(shù)據(jù)結(jié)構(gòu)
Java中,有三種類型比較耗費內(nèi)存:
對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內(nèi)存空間。字符串,每個字符串內(nèi)部都有一個字符數(shù)組以及長度等額外信息。集合類型,比如HashMap、LinkedList等,因為集合類型內(nèi)部通常會使用一些內(nèi)部類來 封裝集合元素,比如Map.Entry。
因此Spark官方建議,在Spark編碼實現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡 量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對象,使用原始類型(比如 Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用 ,從而降低GC頻率,提升性能。
數(shù)據(jù)本地性
數(shù)據(jù)本地化級別:
PROCESS_LOCAL 進程本地化,數(shù)據(jù)和task任務(wù)在同一個Executor中執(zhí)行,默認(rèn)
NODE_LOCA 節(jié)點本地化 數(shù)據(jù)和task任務(wù)在同一個節(jié)點中不同的Executor中執(zhí)行 跨Executor拉取數(shù)據(jù)
NO_PREF 第三方存儲中間件得到數(shù)據(jù),mysql clickhouse redis
RACK_LOCAL 機架本地化 task任務(wù)和數(shù)據(jù)在同一個機架不同的節(jié)點中執(zhí)行 跨節(jié)點拉取數(shù)據(jù)
ANY 跨機架本地化 task任務(wù)和數(shù)據(jù)不在一個機架上
配置參數(shù)
spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
JVM調(diào)優(yōu)
Spark task執(zhí)行算子函數(shù),可能會創(chuàng)建很多對象,這些對象,都是要放入JVM年輕代中
RDD的緩存數(shù)據(jù)也會放入到堆內(nèi)存中
配置
spark.storage.memoryFraction 默認(rèn)是0.6
調(diào)節(jié)Executor堆外內(nèi)存
問題原因: Executor由于內(nèi)存不足或者對外內(nèi)存不足了,掛掉了,對應(yīng)的Executor上面的block manager也掛掉了,找不到對應(yīng)的shuffle map output文件,Reducer端不能夠拉取數(shù) 據(jù) Executor并沒有掛掉,而是在拉取數(shù)據(jù)的過程出現(xiàn)了問題
調(diào)節(jié)一下executor的堆外內(nèi)存。也許就可以避免報錯;:
yarn下:–conf spark.yarn.executor.memoryOverhead=2048 單位M
standlone下:–conf spark.executor.memoryOverhead=2048單位M
也可以在代碼中設(shè)置
堆外內(nèi)存上限默認(rèn)是每一個executor的內(nèi)存大小的10%;真正處理大數(shù)據(jù)的時候, 這里都會出現(xiàn)問題,導(dǎo)致spark作業(yè)反復(fù)崩潰,無法運行;此時就會去調(diào)節(jié)這個參數(shù),到至少1G (1024M),甚至說2G、4G
調(diào)節(jié)等待時長
executor在進行shuffle write,優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)如果本地 block manager沒有的話,那么會通過TransferService,去遠(yuǎn)程連接其他節(jié)點上executor的block manager去獲取,嘗試建立遠(yuǎn)程的網(wǎng)絡(luò)連接,并且去拉取數(shù)據(jù) 頻繁的讓JVM堆內(nèi)存滿溢,進行垃圾回收。正好碰到那個exeuctor的JVM在垃圾回收。處于垃圾回 收過程中,所有的工作線程全部停止;相當(dāng)于只要一旦進行垃圾回收,spark / executor停止工作, 無法提供響應(yīng),spark默認(rèn)的網(wǎng)絡(luò)連接的超時時長,是60s;如果卡住60s都無法建立連接的話,那 么這個task就失敗了。
解決?–conf spark.core.connection.ack.wait.timeout=300
參數(shù)模板
--num-executors executor的數(shù)量
--executor-memory 每一個executor的內(nèi)存
--executor-cores 每一個executor的核心數(shù)
--driver-memory Driver的內(nèi)存1G-2G(保存廣播變量)
--spark.storage.memoryFraction 設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。
--spark.shuffle.memoryFraction 用戶shuffle的內(nèi)存占比默認(rèn)0.2
-- spark提交yarn-client模式的命令模板
spark-submit --master yarn --deploy-mode client --num-executors 2 --executor-memory 1G --executor-cores 1 --class xxx.xxx.Xxx xxx.jar
-- spark提交yarn-cluster模式的命令模板
spark-submit --master yarn --deploy-mode cluster --num-executors 2 --executor-memory 1G --executor-cores 1 --class xxx.xxx.Xxx xxx.jar
總的內(nèi)存=num-executors * executor-memory
總的核數(shù)=num-executors * executor-cores
不能亂給,且不能給滿,因為作業(yè)執(zhí)行還有其他的進行需要額外啟動 --num-executors 200 --executor-memory 100G --executor-cores 100 (錯誤給資源的例子)
spark on yarn 資源設(shè)置標(biāo)準(zhǔn)
1、單個任務(wù)的總內(nèi)存和總核數(shù)一般在yarn總的資源的1/3到1/2之間給資源
一般來說一個稍微大點的公司,集群的服務(wù)器個數(shù)大概在10臺左右
單臺服務(wù)器的內(nèi)存大概是128G,核數(shù)大概是40個左右(國網(wǎng),電信 100臺以上)
小公司:如果公司規(guī)???cè)藬?shù)在80人左右,大數(shù)據(jù)部門在11人左右,5臺大數(shù)據(jù)平臺服務(wù)器 每一臺的配置是8核16G內(nèi)存
yarn總的內(nèi)存 = 10*128G*0.8=960G
yarn總的核數(shù) = 40*10=400
提交spark作業(yè)資源
參數(shù)計算后總的內(nèi)存=960*(1/3 | 1/2)= 300G - 480G
參數(shù)計算后總的核數(shù)=400*(1/3 | 1/2)= 120 - 200
2、在實際生產(chǎn)上線的時候,資源要按照實際的情況合理定資源
2.1、數(shù)據(jù)量比較小 - 10G
10G = 80個block = rdd80分區(qū) = 80個task
- 最理想資源指定 -- 剩余資源充足
--num-executors=40
--executor-memory=4G
--executor-cores=2
- 資源里面最優(yōu)的方式 -- 剩余資源不是很充足時
--num-executors=20
--executor-memory=4G
--executor-cores=2
2.2、數(shù)據(jù)量比較大時 - 80G
80G = 640block = 640分區(qū) = 640task
- 最理想資源指定 -- 剩余資源充足, 如果剩余資源不夠,還需要減少指定的資源
--num-executors=100
--executor-memory=4G
--executor-cores=2
-- spark.locality.wait: spark task 再executor中執(zhí)行前的等待時間 默認(rèn)3秒
spark.yarn.executor.memoryOverhead : 堆外內(nèi)存 默認(rèn)等于堆內(nèi)存的10%
spark.network.timeout spark網(wǎng)絡(luò)鏈接的超時時間 默認(rèn)120s
模板
spark-submit
--master yarn
--deploy-mode cluster
--num-executors = 50
--executor-memory = 4G
--executor-cores = 2
--driver-memory = 2G
--conf spark.storage.memoryFraction=0.4
--conf spark.shuffle.memoryFraction=0.4
--conf spark.locality.wait=10s
--conf spark.shuffle.file.buffer=64kb
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.network.timeout=200s
以下參數(shù)也可以在spark代碼中指定
--conf spark.storage.memoryFraction=0.4
--conf spark.shuffle.memoryFraction=0.4
--conf spark.locality.wait=10s
--conf spark.shuffle.file.buffer=64kb
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.network.timeout=200s
shuffle調(diào)優(yōu)
概述:
reduceByKey:要把分布在集群各個節(jié)點上的數(shù)據(jù)中的同一個key,對應(yīng)的values,都給 集中到一個節(jié)點的一個executor的一個task中,對集合起來的value執(zhí)行傳入的函數(shù)進行 reduce操作,最后變成一個value
配置
spark.shuffle.manager, 默認(rèn)是sort
spark.shuffle.consolidateFiles,默認(rèn)是false
spark.shuffle.file.buffer,默認(rèn)是32k
spark.shuffle.memoryFraction,默認(rèn)是0.2
Spark 數(shù)據(jù)傾斜解決
Spark中的數(shù)據(jù)傾斜,表現(xiàn)主要有下面幾種:
數(shù)據(jù)傾斜產(chǎn)生的原因:1、數(shù)據(jù)分布不均,2,同時產(chǎn)生了shuffle
Executor lost,OOM,Shuffle過程出錯;
lDriver OOM;單個Executor執(zhí)行時間特別久,整體任務(wù)卡在某個階段不能結(jié)束;正常運行的任務(wù)突然失敗
數(shù)據(jù)傾斜優(yōu)化:
使用Hive ETL預(yù)處理數(shù)據(jù)
適用場景:導(dǎo)致數(shù)據(jù)傾斜的是Hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個key對應(yīng)了100萬數(shù)據(jù),其他key才對應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場景需要頻繁使用Spark對Hive表執(zhí)行某個分析操作,那么比較適合使用這種技術(shù)方案。實現(xiàn)思路:此時可以評估一下,是否可以通過Hive來進行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對數(shù)據(jù)按照key進行聚合,或者是預(yù)先和其他表進行join),然后在Spark作業(yè)中針對的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后的Hive表。此時由于數(shù)據(jù)已經(jīng)預(yù)先進行過聚合或join操作了,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。方案實現(xiàn)原理:這種方案從根源上解決了數(shù)據(jù)傾斜,因為徹底避免了在Spark中執(zhí)行shuffle類算子,那么肯定就不會有數(shù)據(jù)傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本。因為畢竟數(shù)據(jù)本身就存在分布不均勻的問題,所以Hive ETL中進行g(shù)roup by或者join等shuffle操作時,還是會出現(xiàn)數(shù)據(jù)傾斜,導(dǎo)致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。方案優(yōu)缺點:
優(yōu)點:實現(xiàn)起來簡單便捷,效果還非常好,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會大幅度提升。缺點:治標(biāo)不治本,Hive ETL中還是會發(fā)生數(shù)據(jù)傾斜。 方案實踐經(jīng)驗:在一些Java系統(tǒng)與Spark結(jié)合使用的項目中,會出現(xiàn)Java代碼頻繁調(diào)用Spark作業(yè)的場景,而且對Spark作業(yè)的執(zhí)行性能要求很高,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的Hive ETL,每天僅執(zhí)行一次,只有那一次是比較慢的,而之后每次Java調(diào)用Spark作業(yè)時,執(zhí)行速度都會很快,能夠提供更好的用戶體驗。
過濾少數(shù)導(dǎo)致傾斜的key
方案適用場景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個,而且對計算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對應(yīng)10條數(shù)據(jù),但是只有一個key對應(yīng)了100萬數(shù)據(jù),從而導(dǎo)致了數(shù)據(jù)傾斜。方案實現(xiàn)思路:如果我們判斷那少數(shù)幾個數(shù)據(jù)量特別多的key,對作業(yè)的執(zhí)行和計算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執(zhí)行filter算子過濾掉這些key。如果需要每次作業(yè)執(zhí)行時,動態(tài)判定哪些key的數(shù)據(jù)量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數(shù)量,取數(shù)據(jù)量最多的key過濾掉即可。方案實現(xiàn)原理:將導(dǎo)致數(shù)據(jù)傾斜的key給過濾掉之后,這些key就不會參與計算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜。方案優(yōu)缺點:
優(yōu)點:實現(xiàn)簡單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。缺點:適用場景不多,大多數(shù)情況下,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個。
提高shuffle操作的并行度
方案適用場景:如果我們必須要對數(shù)據(jù)傾斜迎難而上,那么建議優(yōu)先使用這種方案,因為這是處理數(shù)據(jù)傾斜最簡單的一種方案。方案實現(xiàn)思路:在對RDD執(zhí)行shuffle算子時,給shuffle算子傳入一個參數(shù),比如reduceByKey(1000),該參數(shù)就設(shè)置了這個shuffle算子執(zhí)行時shuffle read task的數(shù)量,即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,默認(rèn)是200,對于很多場景來說都有點過小。方案實現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的,那么這個task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。方案優(yōu)缺點:
優(yōu)點:實現(xiàn)起來比較簡單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響。缺點:只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)實踐經(jīng)驗來看,其效果有限。
雙重聚合 (局部聚合+全局聚合)
方案適用場景:對RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。方案實現(xiàn)思路:這個方案的核心實現(xiàn)思路就是進行兩階段聚合:第一次是局部聚合,先給每個key都打上一個隨機數(shù),比如10以內(nèi)的隨機數(shù),此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結(jié)果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。方案實現(xiàn)原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合,進而解決單個task處理數(shù)據(jù)量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結(jié)果。具體原理見下圖。方案優(yōu)缺點:
優(yōu)點:對于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜,效果是非常不錯的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜,將Spark作業(yè)的性能提升數(shù)倍以上。缺點:僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
將reduce join轉(zhuǎn)為map join
方案適用場景:在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數(shù)據(jù)量比較?。ū热鐜装費或者一兩G),比較適用此方案。方案實現(xiàn)思路:不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現(xiàn)join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個Broadcast變量,廣播給其他Executor節(jié)點;接著對另外一個RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來。方案實現(xiàn)原理:普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。具體原理如下圖所示。方案優(yōu)缺點:
優(yōu)點:對join操作導(dǎo)致的數(shù)據(jù)傾斜,效果非常好,因為根本就不會發(fā)生shuffle,也就根本不會發(fā)生數(shù)據(jù)傾斜。缺點:適用場景較少,因為這個方案只適用于一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內(nèi)存資源,driver和每個Executor內(nèi)存中都會駐留一份小RDD的全量數(shù)據(jù)。如果我們廣播出去的RDD數(shù)據(jù)比較大,比如10G以上,那么就可能發(fā)生內(nèi)存溢出了。因此并不適合兩個都是大表的情況。
采樣傾斜key并分拆join操作
方案適用場景:兩個RDD/Hive表進行join的時候,如果數(shù)據(jù)量都比較大,無法采用“解決方案五”,那么此時可以看一下兩個RDD/Hive表中的key分布情況。如果出現(xiàn)數(shù)據(jù)傾斜,是因為其中某一個RDD/Hive表中的少數(shù)幾個key的數(shù)據(jù)量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。方案實現(xiàn)思路:對包含少數(shù)幾個數(shù)據(jù)量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統(tǒng)計一下每個key的數(shù)量,計算出來數(shù)據(jù)量最大的是哪幾個key。然后將這幾個key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以內(nèi)的隨機數(shù)作為前綴;而不會導(dǎo)致傾斜的大部分key形成另外一個RDD。接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應(yīng)的數(shù)據(jù)并形成一個單獨的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個0~n的前綴;不會導(dǎo)致傾斜的大部分key也形成另外一個RDD。再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。而另外兩個普通的RDD就照常join即可。最后將兩次join的結(jié)果使用union算子合并起來即可,就是最終的join結(jié)果。方案實現(xiàn)原理:對于join導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個key導(dǎo)致了傾斜,可以將少數(shù)幾個key分拆成獨立RDD,并附加隨機前綴打散成n份去進行join,此時這幾個key對應(yīng)的數(shù)據(jù)就不會集中在少數(shù)幾個task上,而是分散到多個task進行join了。具體原理見下圖。方案優(yōu)缺點:
優(yōu)點:對于join導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個key導(dǎo)致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數(shù)傾斜key對應(yīng)的數(shù)據(jù)進行擴容n倍,不需要對全量數(shù)據(jù)進行擴容。避免了占用過多內(nèi)存。缺點:如果導(dǎo)致傾斜的key特別多的話,比如成千上萬個key都導(dǎo)致數(shù)據(jù)傾斜,那么這種方式也不適合。
使用隨機前綴和擴容RDD進行join
方案實現(xiàn)思路:該方案的實現(xiàn)思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數(shù)據(jù)分布情況,找到那個造成數(shù)據(jù)傾斜的RDD/Hive表,比如有多個key都對應(yīng)了超過1萬條數(shù)據(jù)。然后將該RDD的每條數(shù)據(jù)都打上一個n以內(nèi)的隨機前綴。同時對另外一個正常的RDD進行擴容,將每條數(shù)據(jù)都擴容成n條數(shù)據(jù),擴容出來的每條數(shù)據(jù)都依次打上一個0~n的前綴。最后將兩個處理后的RDD進行join即可。
柚子快報激活碼778899分享:大數(shù)據(jù) Spark學(xué)習(xí)
推薦閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。