欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Spark SQL基礎(chǔ)

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Spark SQL基礎(chǔ)

http://yzkb.51969.com/

Spark SQL基礎(chǔ)

Spark SQL介紹

? Spark SQL是一個(gè)用于結(jié)構(gòu)化數(shù)據(jù)處理的Spark組件。所謂結(jié)構(gòu)化數(shù)據(jù),是指具有Schema信息的數(shù)據(jù),例如JSON、Parquet、Avro、CSV格式的數(shù)據(jù)。與基礎(chǔ)的Spark RDD API不同,Spark SQL提供了對結(jié)構(gòu)化數(shù)據(jù)的查詢和計(jì)算接口。

Spark SQL的主要特點(diǎn):

將SQL查詢與Spark應(yīng)用程序無縫組合

? Spark SQL允許使用SQL或熟悉的API在Spark程序中查詢結(jié)構(gòu)化數(shù)據(jù)。與Hive不同的是,Hive是將SQL翻譯成MapReduce作業(yè),底層是基于MapReduce的;而Spark SQL底層使用的是Spark RDD。

可以連接到多種數(shù)據(jù)源

? Spark SQL提供了訪問各種數(shù)據(jù)源的通用方法,數(shù)據(jù)源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

在現(xiàn)有的數(shù)據(jù)倉庫上運(yùn)行SQL或HiveQL查詢

? Spark SQL支持HiveQL語法以及Hive SerDes和UDF (用戶自定義函數(shù)) ,允許訪問現(xiàn)有的Hive倉庫。

DataFrame和DataSet

DataFrame的結(jié)構(gòu)

? DataFrame是Spark SQL提供的一個(gè)編程抽象,與RDD類似,也是一個(gè)分布式的數(shù)據(jù)集合。但與RDD不同的是,DataFrame的數(shù)據(jù)都被組織到有名字的列中,就像關(guān)系型數(shù)據(jù)庫中的表一樣。

? DataFrame在RDD的基礎(chǔ)上添加了數(shù)據(jù)描述信息(Schema,即元信息) ,因此看起來更像是一張數(shù)據(jù)庫表。例如,在一個(gè)RDD中有3行數(shù)據(jù),將該RDD轉(zhuǎn)成DataFrame后,其中的數(shù)據(jù)可能如圖所示:

DataSet的結(jié)構(gòu) Dataset是一個(gè)分布式數(shù)據(jù)集,是Spark 1.6中添加的一個(gè)新的API。相比于RDD, Dataset提供了強(qiáng)類型支持,在RDD的每行數(shù)據(jù)加了類型約束。

? 在Spark中,一個(gè)DataFrame代表的是一個(gè)元素類型為Row的Dataset,即DataFrame只是Dataset[Row]的一個(gè)類型別名。

Spark SQL的基本使用

? Spark Shell啟動(dòng)時(shí)除了默認(rèn)創(chuàng)建一個(gè)名為sc的SparkContext的實(shí)例外,還創(chuàng)建了一個(gè)名為spark的SparkSession實(shí)例,該spark變量可以在Spark Shell中直接使用。

? SparkSession只是在SparkContext基礎(chǔ)上的封裝,應(yīng)用程序的入口仍然是SparkContext。SparkSession允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序,支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,然后使用SQL語句來操作DataFrame數(shù)據(jù)。

Spark SQL基本使用案例

在HDFS中有一個(gè)文件/input/person.txt,文件內(nèi)容如下:

現(xiàn)需要使用Spark SQL將該文件中的數(shù)據(jù)按照年齡降序排列,步驟如下:

進(jìn)入spark-shell環(huán)境

加載數(shù)據(jù)為Dataset

val d1 = spark.read.textFile("hdfs://192.168.121.131:9000/input/person.txt")

d1.show() # 查看d1中的數(shù)據(jù)內(nèi)容

? 從上述代碼的結(jié)果可以看出,Dataset將文件中的每一行看作一個(gè)元素,并且所有元素組成了一列,列名默認(rèn)為value。

給Dataset添加元數(shù)據(jù)信息

? 定義一個(gè)樣例類Person,用于存放數(shù)據(jù)描述信息,代碼如下:

case class Person(id:Int,name:String,age:Int)

? 注:Scala有一種特殊的類叫做樣例類(case class)。默認(rèn)情況下,樣例類一般用于不可變對象(樣例類構(gòu)造參數(shù)默認(rèn)聲明為val)。

? 調(diào)用Dataset的map()算子將每一個(gè)元素拆分并存入Person類中,代碼如下:

val personDataset = d1.map(line=>{

val fields = line.split(",")

val id = fields(0).toInt

val name = fields(1)

val age = fields(2).toInt

Person(id,name,age)

})

personDataset.show() # 查看personDataset中的數(shù)據(jù)內(nèi)容

可以看到,personDataset中的數(shù)據(jù)類似于一張關(guān)系型數(shù)據(jù)庫的表。

將Dataset轉(zhuǎn)為DataFrame

? Spark SQL查詢的是DataFrame中的數(shù)據(jù),因此需要將存有元數(shù)據(jù)信息的Dataset轉(zhuǎn)為DataFrame。

? 調(diào)用Dataset的toDF()方法,將存有元數(shù)據(jù)的Dataset轉(zhuǎn)為DataFrame,代碼如下:

val pdf = personDataset.toDF()

執(zhí)行SQL查詢

? 在DataFrame上創(chuàng)建一個(gè)臨時(shí)視圖v_person,并使用SparkSession對象執(zhí)行SQL查詢,代碼如下:

pdf.createTempView("v_person")

val result = spark.sql("select * from v_person order by age desc")

result.show()

Spark SQL函數(shù)

內(nèi)置函數(shù)

? Spark SQL內(nèi)置了大量的函數(shù),位于API org.apache.spark.sql.functions

中。其中大部分函數(shù)與Hive中的相同。

? 使用內(nèi)置函數(shù)有兩種方式:一種是通過編程的方式使用;另一種是在SQL

語句中使用。

以編程的方式使用lower()函數(shù)將用戶姓名轉(zhuǎn)為小寫/大寫,代碼如下:

df.select(lower(col("name")).as("greet")).show()

df.select(upper(col("name")).as("greet")).show()

? 上述代碼中,df指的是DataFrame對象,使用select()方法傳入需要查詢的列,使用as()方法指定列的別名。代碼col(“name”)指定要查詢的列,也可以使用$"name"代替,代碼如下:

df.select(lower($"name").as("greet")).show()

以SQL語句的方式使用lower()函數(shù),代碼如下:

df.createTempView("temp")

spark.sql("select upper(name) as greet from temp").show()

? 除了可以使用select()方法查詢指定的列外,還可以直接使用filter()、groupBy()等方法對DataFrame數(shù)據(jù)進(jìn)行過濾和分組,例如以下代碼:

df.printSchema() # 打印Schema信息

df.select("name").show() # 查詢name列

# 查詢name列和age列,其中將age列的值增加1

df.select($"name",$"age"+1).show()

df.filter($"age">25).show() # 查詢age>25的所有數(shù)據(jù)

# 根據(jù)age進(jìn)行分組,并求每一組的數(shù)量

df.groupBy("age").count().show()

自定義函數(shù)

? 當(dāng)Spark SQL提供的內(nèi)置函數(shù)不能滿足查詢需求時(shí),用戶可以根據(jù)需求編寫自定義函數(shù)(User Defined Functions, UDF),然后在Spark SQL中調(diào)用。

? 例如有這樣一個(gè)需求:為了保護(hù)用戶的隱私,當(dāng)查詢數(shù)據(jù)的時(shí)候,需要將用戶手機(jī)號(hào)的中間4位數(shù)字用星號(hào)()代替,比如手機(jī)號(hào)180***2688。這時(shí)就可以編寫一個(gè)自定義函數(shù)來實(shí)現(xiàn)這個(gè)需求,實(shí)現(xiàn)代碼如下:

?

package spark.demo.sql

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.types.{StringType, StructField, StructType}

import org.apache.spark.sql.{Row, SparkSession}

/**

* 用戶自定義函數(shù),隱藏手機(jī)號(hào)中間4位

*/

object SparkSQLUDF {

def main(args: Array[String]): Unit = {

//創(chuàng)建或得到SparkSession

val spark = SparkSession.builder()

.appName("SparkSQLUDF")

.master("local[*]")

.getOrCreate()

//第一步:創(chuàng)建測試數(shù)據(jù)(或直接從文件中讀?。?/p>

//模擬數(shù)據(jù)

val arr=Array("18001292080","13578698076","13890890876")

//將數(shù)組數(shù)據(jù)轉(zhuǎn)為RDD

val rdd: RDD[String] = spark.sparkContext.parallelize(arr)

//將RDD[String]轉(zhuǎn)為RDD[Row]

val rowRDD: RDD[Row] = rdd.map(line=>Row(line))

//定義數(shù)據(jù)的schema

val schema=StructType(

List{

StructField("phone",StringType,true)

}

)

//將RDD[Row]轉(zhuǎn)為DataFrame

val df = spark.createDataFrame(rowRDD, schema)

//第二步:創(chuàng)建自定義函數(shù)(phoneHide)

val phoneUDF=(phone:String)=>{

var result = "手機(jī)號(hào)碼錯(cuò)誤!"

if (phone != null && (phone.length==11)) {

val sb = new StringBuffer

sb.append(phone.substring(0, 3))

sb.append("****")

sb.append(phone.substring(7))

result = sb.toString

}

result

}

//注冊函數(shù)(第一個(gè)參數(shù)為函數(shù)名稱,第二個(gè)參數(shù)為自定義的函數(shù))

spark.udf.register("phoneHide",phoneUDF)

//第三步:調(diào)用自定義函數(shù)

df.createTempView("t_phone") //創(chuàng)建臨時(shí)視圖

spark.sql("select phoneHide(phone) as phone from t_phone").show()

// +-----------+

// | phone|

// +-----------+

// |180****2080|

// |135****8076|

// |138****0876|

// +-----------+

}

}

窗口(開窗)函數(shù)

? 開窗函數(shù)是為了既顯示聚合前的數(shù)據(jù),又顯示聚合后的數(shù)據(jù),即在每一行的最后一列添加聚合函數(shù)的結(jié)果。開窗口函數(shù)有以下功能:

同時(shí)具有分組和排序的功能不減少原表的行數(shù)開窗函數(shù)語法:

聚合類型開窗函數(shù)

sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

排序類型開窗函數(shù)

ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

以row_number()開窗函數(shù)為例:

? 開窗函數(shù)row_number()是Spark SQL中常用的一個(gè)窗口函數(shù),使用該函數(shù)可以在查詢結(jié)果中對每個(gè)分組的數(shù)據(jù),按照其排列的順序添加一列行號(hào)(從1開始),根據(jù)行號(hào)可以方便地對每一組數(shù)據(jù)取前N行(分組取TopN)。row_number()函數(shù)的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行號(hào)列別名

上述格式說明如下:

partition by:按照某一列進(jìn)行分組;

order by:分組后按照某一列進(jìn)行組內(nèi)排序;

desc:降序,默認(rèn)升序。

例如,統(tǒng)計(jì)每一個(gè)產(chǎn)品類別的銷售額前3名,代碼如下:

package spark.demo.sql

import org.apache.spark.sql.types._

import org.apache.spark.sql.{Row, SparkSession}

/**

* 統(tǒng)計(jì)每一個(gè)產(chǎn)品類別的銷售額前3名(相當(dāng)于分組求TOPN)

*/

object SparkSQLWindowFunctionDemo {

def main(args: Array[String]): Unit = {

//創(chuàng)建或得到SparkSession

val spark = SparkSession.builder()

.appName("SparkSQLWindowFunctionDemo")

.master("local[*]")

.getOrCreate()

//第一步:創(chuàng)建測試數(shù)據(jù)(字段:日期、產(chǎn)品類別、銷售額)

val arr=Array(

"2019-06-01,A,500",

"2019-06-01,B,600",

"2019-06-01,C,550",

"2019-06-02,A,700",

"2019-06-02,B,800",

"2019-06-02,C,880",

"2019-06-03,A,790",

"2019-06-03,B,700",

"2019-06-03,C,980",

"2019-06-04,A,920",

"2019-06-04,B,990",

"2019-06-04,C,680"

)

//轉(zhuǎn)為RDD[Row]

val rowRDD=spark.sparkContext

.makeRDD(arr)

.map(line=>Row(

line.split(",")(0),

line.split(",")(1),

line.split(",")(2).toInt

))

//構(gòu)建DataFrame元數(shù)據(jù)

val structType=StructType(Array(

StructField("date",StringType,true),

StructField("type",StringType,true),

StructField("money",IntegerType,true)

))

//將RDD[Row]轉(zhuǎn)為DataFrame

val df=spark.createDataFrame(rowRDD,structType)

//第二步:使用開窗函數(shù)取每一個(gè)類別的金額前3名

df.createTempView("t_sales") //創(chuàng)建臨時(shí)視圖

//執(zhí)行SQL查詢

spark.sql(

"select date,type,money,rank from " +

"(select date,type,money," +

"row_number() over (partition by type order by money desc) rank "+

"from t_sales) t " +

"where t.rank<=3"

).show()

}

}

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Spark SQL基礎(chǔ)

http://yzkb.51969.com/

文章鏈接

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19282698.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄