欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)分析與內(nèi)存計(jì)算學(xué)習(xí)筆記

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)分析與內(nèi)存計(jì)算學(xué)習(xí)筆記

http://yzkb.51969.com/

一、Scala編程初級(jí)實(shí)踐

1.計(jì)算級(jí)數(shù):

請(qǐng)用腳本的方式編程計(jì)算并輸出下列級(jí)數(shù)的前n項(xiàng)之和Sn,直到Sn剛好大于或等于q為止,其中q為大于0的整數(shù),其值通過鍵盤輸入。(不使用腳本執(zhí)行方式可寫Java代碼轉(zhuǎn)換成Scala代碼執(zhí)行)

例如,若q的值為50.0,則輸出應(yīng)為:Sn=50.416695。

測(cè)試樣例:

q=1時(shí),Sn=2;

q=30時(shí),Sn=30.891459;

q=50時(shí),Sn=50.416695;

相關(guān)代碼:

import scala.io.StdIn.readInt

object MedicalOne {

def main(args: Array[String]): Unit = {

var Sn: Float = 0

var n: Float = 1

println("please input q:")

val q = readInt()

while (Sn < q) {

Sn += (n + 1) / n

n += 1

}

if (Sn == Sn.toInt) {

println(s"Sn=${Sn.toInt}")

} else {

println(s"Sn=$Sn")

}

}

}

運(yùn)行結(jié)果:

2 模擬圖形繪制:

對(duì)于一個(gè)圖形繪制程序,用下面的層次對(duì)各種實(shí)體進(jìn)行抽象。定義一個(gè)Drawable的特質(zhì),其包括一個(gè)draw方法,默認(rèn)實(shí)現(xiàn)為輸出對(duì)象的字符串表示。定義一個(gè)Point類表示點(diǎn),其混入了Drawable特質(zhì),并包含一個(gè)shift方法,用于移動(dòng)點(diǎn)。所有圖形實(shí)體的抽象類為Shape,其構(gòu)造函數(shù)包括一個(gè)Point類型,表示圖形的具體位置(具體意義對(duì)不同的具體圖形不一樣)。Shape類有一個(gè)具體方法moveTo和一個(gè)抽象方法zoom,其中moveTo將圖形從當(dāng)前位置移動(dòng)到新的位置, 各種具體圖形的moveTo可能會(huì)有不一樣的地方。zoom方法實(shí)現(xiàn)對(duì)圖形的放縮,接受一個(gè)浮點(diǎn)型的放縮倍數(shù)參數(shù),不同具體圖形放縮實(shí)現(xiàn)不一樣。繼承Shape類的具體圖形類型包括直線類Line和圓類Circle。Line類的第一個(gè)參數(shù)表示其位置,第二個(gè)參數(shù)表示另一個(gè)端點(diǎn),Line放縮的時(shí)候,其中點(diǎn)位置不變,長(zhǎng)度按倍數(shù)放縮(注意,縮放時(shí),其兩個(gè)端點(diǎn)信息也改變了),另外,Line的move行為影響了另一個(gè)端點(diǎn),需要對(duì)move方法進(jìn)行重載。Circle類第一個(gè)參數(shù)表示其圓心,也是其位置,另一個(gè)參數(shù)表示其半徑,Circle縮放的時(shí)候,位置參數(shù)不變,半徑按倍數(shù)縮放。另外直線類Line和圓類Circle都混入了Drawable特質(zhì),要求對(duì)draw進(jìn)行重載實(shí)現(xiàn),其中類Line的draw輸出的信息樣式為“Line:第一個(gè)端點(diǎn)的坐標(biāo)--第二個(gè)端點(diǎn)的坐標(biāo))”,類Circle的draw輸出的信息樣式為“Circle center:圓心坐標(biāo),R=半徑”。如下的代碼已經(jīng)給出了Drawable和Point的定義,同時(shí)也給出了程序入口main函數(shù)的實(shí)現(xiàn),請(qǐng)完成Shape類、Line類和Circle類的定義。

相關(guān)代碼

代碼目錄:

主類GraphicPlotting:

object GraphicPlotting {

def main(args: Array[String]): Unit = {

val p = Point(10, 30)

p.draw()

val line1 = new Line(Point(0, 0), Point(20, 20))

line1.draw()

line1.moveTo(Point(5, 5)) //移動(dòng)到一個(gè)新的點(diǎn)

line1.draw()

line1.zoom(2) //放大兩倍

line1.draw()

val cir = new Circle(Point(10, 10), 5)

cir.draw()

cir.moveTo(Point(30, 20))

cir.draw()

cir.zoom(0.5)

cir.draw()

}

}

Drawable類

trait Drawable {

def draw() {

println(this.toString)

}

}

Point類

case class Point(var x: Double, var y: Double) extends Drawable {

def shift(deltaX: Double, deltaY: Double) {

x += deltaX;

y += deltaY

}

}

Shape類

abstract class Shape(var location: Point) {

def moveTo(newLocation: Point) {

location = newLocation

}

def zoom(scale: Double)

}

Line類

class Line(beginPoint: Point, var endPoint: Point) extends Shape(beginPoint) with Drawable {

override def draw() {

println(s"Line:(${location.x},${location.y})--(${endPoint.x},${endPoint.y})")

} //按指定格式重載click

override def moveTo(newLocation: Point) {

endPoint.shift(newLocation.x - location.x, newLocation.y - location.y) //直線移動(dòng)時(shí),先移動(dòng)另外一個(gè)端點(diǎn)

location = newLocation //移動(dòng)位置

}

override def zoom(scale: Double) {

val midPoint = Point((endPoint.x + location.x) / 2, (endPoint.y + location.y) / 2) //求出中點(diǎn),并按中點(diǎn)進(jìn)行縮放

location.x = midPoint.x + scale * (location.x - midPoint.x)

location.y = midPoint.y + scale * (location.y - midPoint.y)

endPoint.x = midPoint.x + scale * (endPoint.x - midPoint.x)

endPoint.y = midPoint.y + scale * (endPoint.y - midPoint.y)

}

}

Circle類

class Circle(center: Point, var radius: Double) extends Shape(center) with Drawable {

override def draw() { //按指定格式重載click

println(s"Circle center:(${location.x},${location.y}),R=$radius")

}

override def zoom(scale: Double) {

radius = radius * scale //對(duì)圓的縮放只用修改半徑

}

}

編譯運(yùn)行程序,期望的輸出結(jié)果如下:

3 統(tǒng)計(jì)學(xué)生成績(jī):

學(xué)生的成績(jī)清單格式如下所示,第一行為表頭,各字段意思分別為學(xué)號(hào)、性別、課程名1、課程名2等,后面每一行代表一個(gè)學(xué)生的信息,各字段之間用空白符隔開

Id???? gender Math??? English?? Physics???

301610??? male?? 80????? 64??????? 78

301611? female?? 65????? 87??????? 58

...

給定任何一個(gè)如上格式的清單(不同清單里課程數(shù)量可能不一樣),要求盡可能采用函數(shù)式編程,統(tǒng)計(jì)出各門課程的平均成績(jī),最低成績(jī),和最高成績(jī);另外還需按男女同學(xué)分開,分別統(tǒng)計(jì)各門課程的平均成績(jī),最低成績(jī),和最高成績(jī)。

測(cè)試樣例1如下:

Id???? gender Math??? English?? Physics???

301610??? male?? 80????? 64??????? 78

301611? female?? 65????? 87??????? 58

301612? female?? 44????? 71??????? 77

301613? female?? 66????? 71??????? 91

301614? female?? 70????? 71?????? 100

301615??? male?? 72????? 77??????? 72

301616? female?? 73????? 81??????? 75

301617? female?? 69????? 77??????? 75

301618??? male?? 73????? 61??????? 65

301619??? male?? 74????? 69??????? 68

301620??? male?? 76????? 62??????? 76

301621??? male?? 73????? 69??????? 91

301622??? male?? 55????? 69??????? 61

301623??? male?? 50????? 58??????? 75

301624? female?? 63????? 83??????? 93

301625??? male?? 72????? 54?????? 100

301626??? male?? 76????? 66??????? 73

301627??? male?? 82????? 87??????? 79

301628? female?? 62????? 80??????? 54

301629??? male?? 89????? 77??????? 72

測(cè)試樣例2如下:

Id???? gender Math??? English?? Physics? Science

301610??? male?? 72 ?? ?39? ????? ?74 ?? ?93

301611??? male?? 75 ?? ?85? ????? ?93 ?? ?26

301612? female?? 85 ? ?79? ????? ?91 ?? ?57

301613? female?? 63 ? ?89? ????? ?61 ?? ?62

301614??? male?? 72 ?? ?63? ????? ?58 ?? ?64

301615??? male?? 99 ?? ?82? ????? ?70 ?? ?31

301616? female? 100 ? ?81? ????? ?63 ?? ?72

301617??? male?? 74 ?? 100? ????? ?81 ?? ?59

301618? female?? 68 ? ?72? ????? ?63 ?? 100

301619??? male?? 63 ?? ?39? ????? ?59 ?? ?87

301620? female?? 84 ? ?88? ????? ?48 ?? ?48

301621??? male?? 71 ?? ?88? ????? ?92 ?? ?46

301622??? male?? 82 ?? ?49? ????? ?66 ?? ?78

301623??? male?? 63 ?? ?80? ????? ?83 ?? ?88

301624? female?? 86 ? ?80? ????? ?56 ?? ?69

301625??? male?? 76 ?? ?69? ????? ?86 ?? ?49

301626??? male?? 91 ?? ?59? ????? ?93 ?? ?51

301627? female?? 92 ? ?76? ????? ?79 ?? 100

301628??? male?? 79 ?? ?89? ????? ?78 ?? ?57

301629??? male?? 85 ?? ?74? ????? ?78 ?? ?80

樣例1的統(tǒng)計(jì)代碼和結(jié)果輸出

相關(guān)代碼

import scala.io.Source

object Studentgrades_1 {

def main(args: Array[String]): Unit = {

val fileName = "D:\\IDEAProjects\\SecondScala\\src\\main\\scala\\grades1.txt"

val lines = Source.fromFile(fileName).getLines().toList

val header = lines.head.trim.split("\\s+").map(_.trim)

val data = lines.tail.map(_.trim.split("\\s+"))

val courseNames = header.drop(2)

val statsTotal = calculateStatistics(data, courseNames)

val statsMales = calculateStatistics(data.filter(_ (1) == "male"), courseNames)

val statsFemales = calculateStatistics(data.filter(_ (1) == "female"), courseNames)

printStatistics(statsTotal, "")

printStatistics(statsMales, " (males)")

printStatistics(statsFemales, " (females)")

}

def calculateStatistics(data: List[Array[String]], courses: Array[String]): List[(String, Double, Double, Double)] = {

val courseScores = courses.indices.map { i =>

val scores = data.filter(_(i + 2).matches("-?\\d+(\\.\\d+)?")).map(_(i + 2).toDouble) // Ensure we only have numbers

if (scores.isEmpty) {

(courses(i), 0.0, 0.0, 0.0) // Avoid division by zero if there are no scores for a course

} else {

val average = scores.sum / scores.length

val min = scores.min

val max = scores.max

(courses(i), average, min, max)

}

}

courseScores.toList

}

def printStatistics(stats: List[(String, Double, Double, Double)], title: String): Unit = {

println(s"course average min max${title}")

stats.foreach { case (course, average, min, max) =>

println(f"$course: $average%.2f $min%.2f $max%.2f")

}

println()

}

}

結(jié)果輸出

樣例1的統(tǒng)計(jì)代碼和結(jié)果輸出

相關(guān)代碼

import scala.io.Source

object Studentgrades_2 {

def main(args: Array[String]): Unit = {

val fileName = "D:\\IDEAProjects\\SecondScala\\src\\main\\scala\\grades2.txt"

val lines = Source.fromFile(fileName).getLines().toList

val header = lines.head.trim.split("\\s+").map(_.trim)

val data = lines.tail.map(_.trim.split("\\s+"))

val courseNames = header.drop(2)

val statsTotal = calculateStatistics(data, courseNames)

val statsMales = calculateStatistics(data.filter(_ (1) == "male"), courseNames)

val statsFemales = calculateStatistics(data.filter(_ (1) == "female"), courseNames)

printStatistics(statsTotal, "")

printStatistics(statsMales, " (males)")

printStatistics(statsFemales, " (females)")

}

def calculateStatistics(data: List[Array[String]], courses: Array[String]): List[(String, Double, Double, Double)] = {

val courseScores = courses.indices.map { i =>

val scores = data.filter(_(i + 2).matches("-?\\d+(\\.\\d+)?")).map(_(i + 2).toDouble) // Ensure we only have numbers

if (scores.isEmpty) {

(courses(i), 0.0, 0.0, 0.0) // Avoid division by zero if there are no scores for a course

} else {

val average = scores.sum / scores.length

val min = scores.min

val max = scores.max

(courses(i), average, min, max)

}

}

courseScores.toList

}

def printStatistics(stats: List[(String, Double, Double, Double)], title: String): Unit = {

println(s"course average min max")

stats.foreach { case (course, average, min, max) =>

println(f"$course: $average%.2f $min%.2f $max%.2f")

}

println()

}

}

結(jié)果輸出

二、RDD編程初級(jí)實(shí)踐

1.spark-shell交互式編程

使用chapter5-data1.txt,該數(shù)據(jù)集包含了某大學(xué)計(jì)算機(jī)系的成績(jī),數(shù)據(jù)格式如下所示:

Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 ……

請(qǐng)根據(jù)給定的實(shí)驗(yàn)數(shù)據(jù),在spark-shell中通過編程來計(jì)算以下內(nèi)容:

(1)該系總共有多少學(xué)生;

(2)該系共開設(shè)來多少門課程;

(3)Tom同學(xué)的總成績(jī)平均分是多少;

(4)求每名同學(xué)的選修的課程門數(shù);

(5)該系DataBase課程共有多少人選修;

(6)各門課程的平均分是多少;

(7)使用累加器計(jì)算共有多少人選了DataBase這門課。

相關(guān)代碼:

(1)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val par = lines.map(row=>row.split(",")(0))

val distinct_par = par.distinct() //去重操作

distinct_par.count //取得總數(shù)

(2)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val par = lines.map(row=>row.split(",")(1))

val distinct_par = par.distinct()

distinct_par.count

(3)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val pare = lines.filter(row=>row.split(",")(0)=="Tom")

pare.foreach(println)

pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y ) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()

(4)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val pare = lines.map(row=>(row.split(",")(0),row.split(",")(1)))

pare.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)

(5)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val pare = lines.filter(row=>row.split(",")(1)=="DataBase")

pare.count

(6)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val pare = lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))

pare.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1/ x._2)).collect().foreach(x => println(s"${x._1}: ${x._2}"))

(7)

val lines = sc.textFile("file:///home/qiangzi/chapter5-data1.txt")

val pare = lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1))

val accum = sc.longAccumulator("My Accumulator")

pare.values.foreach(x => accum.add(x))

accum.value

2.編寫?yīng)毩?yīng)用程序?qū)崿F(xiàn)數(shù)據(jù)去重

對(duì)于兩個(gè)輸入文件A和B,編寫Spark獨(dú)立應(yīng)用程序,對(duì)兩個(gè)文件進(jìn)行合并,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新文件C。下面是輸入文件和輸出文件的一個(gè)樣例,供參考。

輸入文件A的樣例如下:

20170101??? x

20170102??? y

20170103??? x

20170104??? y

20170105??? z

20170106??? z

輸入文件B的樣例如下:

20170101??? y

20170102??? y

20170103??? x

20170104??? z

20170105??? y

根據(jù)輸入的文件A和B合并得到的輸出文件C的樣例如下:

20170101??? x

20170101??? y

20170102??? y

20170103??? x

20170104??? y

20170104??? z

20170105??? y

20170105??? z

20170106??? z

實(shí)驗(yàn)代碼:

cd ~ # 進(jìn)入用戶主文件夾

mkdir ./remdup

mkdir -p ./remdup/src/main/scala

vim ./remdup/src/main/scala/RemDup.scala

/* RemDup.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import org.apache.spark.HashPartitioner

object RemDup {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("RemDup")

val sc = new SparkContext(conf)

val dataFileA = "file:///home/qiangzi/A.txt"

val dataFileB = "file:///home/qiangzi/B.txt"

val dataA = sc.textFile(dataFileA, 2)

val dataB = sc.textFile(dataFileB, 2)

val res = dataA.union(dataB).filter(_.trim().length > 0).map(line => (line.trim, "")).partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys

res.saveAsTextFile("file:///home/qiangzi/C.txt")

}

}

cd ~/remdup

vim simple.sbt

/* simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "RemDup" --driver-java-options "-Dfile.encoding=UTF-8" ~/remdup/target/scala-2.12/simple-project_2.12-1.0.jar

3.編寫?yīng)毩?yīng)用程序?qū)崿F(xiàn)求平均值問題

每個(gè)輸入文件表示班級(jí)學(xué)生某個(gè)學(xué)科的成績(jī),每行內(nèi)容由兩個(gè)字段組成,第一個(gè)是學(xué)生名字,第二個(gè)是學(xué)生的成績(jī);編寫Spark獨(dú)立應(yīng)用程序求出所有學(xué)生的平均成績(jī),并輸出到一個(gè)新文件中。下面是輸入文件和輸出文件的一個(gè)樣例,供參考。

Algorithm成績(jī):

小明 92

小紅 87

小新 82

小麗 90

Database成績(jī):

小明 95

小紅 81

小新 89

小麗 85

Python成績(jī):

小明 82

小紅 83

小新 94

小麗 91

平均成績(jī)?nèi)缦拢?/p>

??? (小紅,83.67)

??? (小新,88.33)

??? (小明,89.67)

??? (小麗,88.67)

實(shí)驗(yàn)代碼:

cd ~ # 進(jìn)入用戶主文件夾

mkdir ./avgscore

mkdir -p ./avgscore/src/main/scala

vim ./avgscore/src/main/scala/AvgScore.scala

/* AvgScore.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import org.apache.spark.HashPartitioner

object AvgScore {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("AvgScore")

val sc = new SparkContext(conf)

val dataFiles = Array(

"file:///home/qiangzi/Sparkdata/algorithm.txt",

"file:///home/qiangzi/Sparkdata/database.txt",

"file:///home/qiangzi/Sparkdata/python.txt"

)

val data = dataFiles.foldLeft(sc.emptyRDD[String]) { (acc, file) =>

acc.union(sc.textFile(file, 3))

}

val res = data.filter(_.trim().length > 0).map(line => {

val fields = line.split(" ")

(fields(0).trim(), fields(1).trim().toInt)

}).partitionBy(new HashPartitioner(1)).groupByKey().mapValues(x => {

var n = 0

var sum = 0.0

for (i <- x) {

sum += i

n += 1

}

val avg = sum / n

f"$avg%1.2f"

})

res.saveAsTextFile("file:///home/qiangzi/Sparkdata/average.txt")

}

}

cd ~/avgscore

vim simple.sbt

/* simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "AvgScore" --driver-java-options "-Dfile.encoding=UTF-8" ~/avgscore/target/scala-2.12/simple-project_2.12-1.0.jar

三、Spark SQL編程初級(jí)實(shí)踐

1.Spark SQL基本操作

將下列JSON格式數(shù)據(jù)復(fù)制到Linux系統(tǒng)中,并保存命名為employee.json。

{ "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" }

為employee.json創(chuàng)建DataFrame,并寫出Scala語(yǔ)句完成下列操作:

查詢所有數(shù)據(jù);查詢所有數(shù)據(jù),并去除重復(fù)的數(shù)據(jù);查詢所有數(shù)據(jù),打印時(shí)去除id字段;篩選出age>30的記錄;將數(shù)據(jù)按age分組;將數(shù)據(jù)按name升序排列;取出前3行數(shù)據(jù);查詢所有記錄的name列,并為其取別名為username;查詢年齡age的平均值;查詢年齡age的最小值。

實(shí)驗(yàn)代碼:

(1)

import org.apache.spark.sql.SparkSession

val spark=SparkSession.builder().getOrCreate()

import spark.implicits._

val df = spark.read.json("file:///home/qiangzi/employee.json")

df.show()

(2)

df.distinct().show()

(3)

df.drop("id").show()

(4)

df.filter(df("age") > 30 ).show()

(5)

df.groupBy("age").count().show()

(6)

df.sort(df("name").asc).show()

(7)

df.take(3)

(8)

df.select(df("name").as("username")).show()

(9)

val avgAge = df.agg(avg("age")).first().getDouble(0)

(10)

val minAge = df.agg(min("age")).first().getLong(0).toInt

2.編程實(shí)現(xiàn)將RDD轉(zhuǎn)換為DataFrame

源文件內(nèi)容如下(包含id,name,age):

1,Ella,36 2,Bob,29 3,Jack,29

?????? 請(qǐng)將數(shù)據(jù)復(fù)制保存到Linux系統(tǒng)中,命名為employee.txt,實(shí)現(xiàn)從RDD轉(zhuǎn)換得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有數(shù)據(jù)。請(qǐng)寫出程序代碼。

實(shí)驗(yàn)代碼:

cd ~ # 進(jìn)入用戶主文件夾

mkdir ./rddtodf

mkdir -p ./rddtodf/src/main/scala

vim ./rddtodf/src/main/scala/RDDtoDF.scala

/* RDDtoDF.scala */

import org.apache.spark.sql.types._

import org.apache.spark.sql.Encoder

import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession

object RDDtoDF {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("RDDtoDF").getOrCreate()

import spark.implicits._

val employeeRDD = spark.sparkContext.textFile("file:///home/qiangzi/employee.txt")

val schemaString = "id name age"

val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

val rowRDD = employeeRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1), attributes(2).trim))

val employeeDF = spark.createDataFrame(rowRDD, schema)

employeeDF.createOrReplaceTempView("employee")

val results = spark.sql("SELECT id,name,age FROM employee")

results.map(t => "id:" + t(0) + "," + "name:" + t(1) + "," + "age:" + t(2)).show()

spark.stop()

}

}

cd ~/rddtodf

vim simple.sbt

/*simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "3.5.1",

"org.apache.spark" %% "spark-sql" % "3.5.1"

)

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "RDDtoDF" --driver-java-options "-Dfile.encoding=UTF-8" ~/rddtodf/target/scala-2.12/simple-project_2.12-1.0.jar

3.編程實(shí)現(xiàn)利用DataFrame讀寫MySQL的數(shù)據(jù)

(1)在MySQL數(shù)據(jù)庫(kù)中新建數(shù)據(jù)庫(kù)sparktest,再創(chuàng)建表employee,包含如表6-2所示的兩行數(shù)據(jù)。

表6-2 employee表原有數(shù)據(jù)

id name gender Age 1 Alice F 22 2 John M 25

(2)配置Spark通過JDBC連接數(shù)據(jù)庫(kù)MySQL,編程實(shí)現(xiàn)利用DataFrame插入如表6-3所示的兩行數(shù)據(jù)到MySQL中,最后打印出age的最大值和age的總和。

表6-3 employee表新增數(shù)據(jù)

id name gender age 3 Mary F 26 4 Tom M 23

實(shí)驗(yàn)代碼:

mysql -u root -p

create database sparktest;

use sparktest;

create table employee (id int(4), name char(20), gender char(4), age int(4));

insert into employee values(1,'Alice','F',22);

insert into employee values(2,'John','M',25);

cd ~ # 進(jìn)入用戶主文件夾

mkdir ./testmysql

mkdir -p ./testmysql/src/main/scala

vim ./testmysql/src/main/scala/TestMySQL.scala

/* TestMySQL.scala */

import java.util.Properties

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

object TestMySQL {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()

.appName("TestMySQL")

.getOrCreate()

val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26", "4 Tom M 23"))

.map(_.split(" "))

val schema = StructType(List(

StructField("id", IntegerType, true),

StructField("name", StringType, true),

StructField("gender", StringType, true),

StructField("age", IntegerType, true)

))

val rowRDD = employeeRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

val employeeDF = spark.createDataFrame(rowRDD, schema)

val prop = new Properties()

prop.put("user", "root")

prop.put("password", "789456MLq")

prop.put("driver", "com.mysql.jdbc.Driver")

employeeDF.write

.mode("append")

.jdbc("jdbc:mysql://slave1:3306/sparktest", "sparktest.employee", prop)

val jdbcDF = spark.read

.format("jdbc")

.option("url", "jdbc:mysql://slave1:3306/sparktest")

.option("driver", "com.mysql.jdbc.Driver")

.option("dbtable", "employee")

.option("user", "root")

.option("password", "789456MLq")

.load()

val aggregatedDF = jdbcDF.agg(

max("age").alias("max_age"),

sum("age").alias("total_age")

)

aggregatedDF.show()

spark.stop()

}

}

cd ~/testmysql

vim simple.sbt

/*simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "3.5.1",

"org.apache.spark" %% "spark-sql" % "3.5.1"

)

/usr/local/sbt-1.9.0/sbt/sbt package

sudo /usr/local/spark-3.5.1/bin/spark-submit --class "TestMySQL" --driver-java-options "-Dfile.encoding=UTF-8" --jars /usr/local/hive-3.1.2/lib/mysql-connector-java-5.1.49.jar ~/testmysql/target/scala-2.12/simple-project_2.12-1.0.jar

四、Spark Streaming編程初級(jí)實(shí)踐

1.實(shí)驗(yàn)?zāi)康?/p>

通過實(shí)驗(yàn)學(xué)習(xí)使用Scala編程實(shí)現(xiàn)文件和數(shù)據(jù)的生成;掌握使用文件作為Spark Streaming數(shù)據(jù)源的編程方法

2.實(shí)驗(yàn)內(nèi)容

(1)、以隨機(jī)時(shí)間間隔在一個(gè)目錄下生成大量文件,文件名隨機(jī)命名,文件中包含隨機(jī)生成的一些英文語(yǔ)句,每個(gè)英語(yǔ)語(yǔ)句內(nèi)部的單詞之間用空格隔開。

(2)、實(shí)時(shí)統(tǒng)計(jì)每10秒新出現(xiàn)的單詞數(shù)量。

(3)、實(shí)時(shí)統(tǒng)計(jì)最近1分鐘內(nèi)每個(gè)單詞的出現(xiàn)次數(shù)(每10秒統(tǒng)計(jì)1次)。

(4)、實(shí)時(shí)統(tǒng)計(jì)每個(gè)單詞累計(jì)出現(xiàn)次數(shù),并將結(jié)果保存到本地文件(每10秒統(tǒng)計(jì)1次)。

3.注意:

? ? ? ? 本次實(shí)驗(yàn)中,實(shí)驗(yàn)(2)、(3)、(4)是在實(shí)驗(yàn)(1)的基礎(chǔ)之上做的,因?yàn)橐隽饔?jì)算,所以實(shí)驗(yàn)(2)、(3)、(4)在打包完運(yùn)行代碼之前,一定要先執(zhí)行(1)代碼,具體步驟如下:

創(chuàng)建好所需目錄后,打開兩個(gè)終端(也可以安裝idea,<1>在idea中運(yùn)行,<2>,<3>,<4>在終端運(yùn)行),一個(gè)執(zhí)行實(shí)驗(yàn)(1)代碼,一個(gè)執(zhí)行實(shí)驗(yàn)(2)、(3)、(4)在實(shí)驗(yàn)(2)、(3)、(4)打包完以后,先執(zhí)行實(shí)驗(yàn)(1)代碼,然后再執(zhí)行(2)、(3)、(4)

3.實(shí)驗(yàn)代碼

(1)

/*===================================================*/

這一部分代碼,我是在idea中運(yùn)行的,你也可以開兩個(gè)終端,在其中

一個(gè)打包運(yùn)行,

/*===================================================*/

import java.io.{File, PrintWriter}

object GenFile{

def main(args: Array[String]) {

val strList = List(

"There are three famous bigdata softwares",

"and they are widely used in real applications",

"For in that sleep of death what dreams may come",

"The slings and arrows of outrageous fortune",

"When we have shuffled off this mortal coil",

"For who would bear the whips and scorns of time",

"That patient merit of the unworthy takes",

"When he himself might his quietus make",

"To grunt and sweat under a weary life",

"But that the dread of something after death",

"And makes us rather bear those ills we have",

"Than fly to others that we know not of",

"Thus conscience does make cowards of us all",

"And thus the native hue of resolution",

"And enterprises of great pith and moment",

"And lose the name of action"

)

var i = 0

while (i < 10000){

Thread.sleep(scala.util.Random.nextInt(5000))

// 生成一個(gè)0到999之間的隨機(jī)數(shù),并轉(zhuǎn)換為字符串

var str = scala.util.Random.nextInt(1000).toString

val filePath = "/home/qiangzi/data/out"+str+".txt"

// 創(chuàng)建PrintWriter對(duì)象,用于寫入文件

val out = new PrintWriter(new File(filePath))

// 隨機(jī)選擇列表中的句子并寫入文件

for (m <- 0 to 3) out.println(strList(scala.util.Random.nextInt(14)))

out.close

i = i + 1

}

}

}

(2)

cd

mkdir ./wordcount

mkdir -p ./wordcount/src/main/scala

vim ./wordcount/src/main/scala/WordCount.scala

/* WordCount.scala*/

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.SparkContext._

object WordCount{

def main(args: Array[String]) {

val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

//設(shè)置為本地運(yùn)行模式,兩個(gè)線程,一個(gè)監(jiān)聽,另一個(gè)處理數(shù)據(jù)

val sc = new SparkContext(sparkConf)

sc.setLogLevel("ERROR")

val ssc = new StreamingContext(sc, Seconds(10)) // 時(shí)間間隔為10秒

val lines = ssc.textFileStream("file:///home/qiangzi/data") // 設(shè)置文件流監(jiān)控的目錄

val words = lines.flatMap(_.split(" "))

val wordCounts = words.count()

wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

cd ~/wordcount

vim simple.sbt

/*simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "3.5.1",

"org.apache.spark" %% "spark-sql" % "3.5.1",

"org.apache.spark" %% "spark-streaming" % "3.5.1" // 添加spark-streaming依賴

)

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "WordCount" --driver-java-options "-Dfile.encoding=UTF-8" ~/wordcount/target/scala-2.12/simple-project_2.12-1.0.jar

(3)

cd

mkdir ./wordcount1

mkdir -p ./wordcount1/src/main/scala

vim ./wordcount1/src/main/scala/WordCountOne.scala

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.SparkContext._

import org.apache.spark.streaming.StreamingContext._

object WordCount {

def main(args: Array[String]) {

// 設(shè)置Spark配置,應(yīng)用程序名為"WordCountMinute",并設(shè)置本地模式運(yùn)行,使用2個(gè)線程

val sparkConf = new SparkConf().setAppName("WordCountMinute").setMaster("local[2]")

val sc = new SparkContext(sparkConf)

sc.setLogLevel("ERROR")

// 創(chuàng)建StreamingContext對(duì)象,設(shè)置批處理間隔為10秒

val ssc = new StreamingContext(sc, Seconds(10))

// 設(shè)置數(shù)據(jù)源為文件流,監(jiān)控指定目錄下的文件變化

val lines = ssc.textFileStream("file:///home/qiangzi/data")

// 將每行文本分割成單詞

val words = lines.flatMap(_.split(" "))

// 創(chuàng)建一個(gè)窗口化的DStream,窗口大小為60秒,滑動(dòng)間隔為10秒

val windowedWords = words.window(Seconds(60), Seconds(10))

val wordCounts = windowedWords.flatMap(word => Array((word, 1)))

.reduceByKey(_ + _)

var outputCount = 0

// 對(duì)每個(gè)RDD進(jìn)行操作,打印每個(gè)單詞的計(jì)數(shù)

wordCounts.foreachRDD { rdd =>

// 增加輸出計(jì)數(shù)

outputCount += 1

// 獲取當(dāng)前系統(tǒng)時(shí)間的毫秒數(shù)

val currentTime = System.currentTimeMillis()

println(s"-------------------------------------------")

println(s"Time: $currentTime ms")

println(s"-------------------------------------------")

// 遍歷RDD中的每個(gè)元素,打印單詞和對(duì)應(yīng)的計(jì)數(shù)

rdd.foreach { case (word, count) =>

println(s"$word: $count")

}

// 如果輸出次數(shù)達(dá)到6次,則停止SparkContext和StreamingContext

if (outputCount == 6) {

println("Finished printing word counts for the last time.")

ssc.stop(stopSparkContext = true, stopGracefully = true)

}

}

ssc.start()

ssc.awaitTermination()

}

}

cd ~/wordcount1

vim simple.sbt

/*simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "3.5.1",

"org.apache.spark" %% "spark-sql" % "3.5.1",

"org.apache.spark" %% "spark-streaming" % "3.5.1" // 添加spark-streaming依賴

)

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "WordCountOne" --driver-java-options "-Dfile.encoding=UTF-8" ~/wordcount1/target/scala-2.12/simple-project_2.12-1.0.jar

(4)

cd

mkdir ./wordcount2

mkdir -p ./wordcount2/src/main/scala

vim ./wordcount2/src/main/scala/WordCountTwo.scala

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.SparkContext._

import org.apache.spark.streaming.StreamingContext._

import java.text.SimpleDateFormat

import java.util.Date

object WordCountWithFileOutput {

def main(args: Array[String]) {

val sparkConf = new SparkConf().setAppName("WordCountWithFileOutput") .setMaster("local[2]") // 設(shè)置本地模式,使用2個(gè)線程

val sc = new SparkContext(sparkConf) // 創(chuàng)建Spark上下文

sc.setLogLevel("ERROR")

val ssc = new StreamingContext(sc, Seconds(10)) // 創(chuàng)建Streaming上下文,設(shè)置批處理時(shí)間為10秒

val lines = ssc.textFileStream("file:///home/qiangzi/data") // 監(jiān)聽目錄,讀取新增文件作為數(shù)據(jù)流

val words = lines.flatMap(_.split(" ")) // 每行數(shù)據(jù)按空格切分,展平為單詞流

// 設(shè)置滑動(dòng)窗口為6000秒長(zhǎng)度,每10秒滑動(dòng)一次(此處長(zhǎng)度設(shè)置可能過大,實(shí)際使用中請(qǐng)根據(jù)需求調(diào)整)

val windowedWords = words.window(Seconds(6000), Seconds(10))

val wordCounts = windowedWords.flatMap(word => Array((word, 1))) .reduceByKey(_ + _) // 按單詞聚合計(jì)數(shù)

var outputCount = 0

wordCounts.foreachRDD { rdd =>

outputCount += 1

val sdf = new SimpleDateFormat("yyyyMMdd_HHmmss") // 創(chuàng)建日期格式化對(duì)象

val currentTimeStr = sdf.format(new Date(System.currentTimeMillis()))

// 遍歷RDD,將每個(gè)單詞及其計(jì)數(shù)寫入文件

rdd.foreach { case (word, count) =>

val outputFile = s"/home/qiangzi/dataout/${currentTimeStr}_wordcount.txt" // 構(gòu)造輸出文件路徑

val content = s"$word: $count\n"

val bw = new java.io.BufferedWriter(new java.io.FileWriter(outputFile, true)) // 創(chuàng)建寫入文件的對(duì)象

bw.write(content)

bw.close()

}

// 當(dāng)輸出計(jì)數(shù)達(dá)到600次時(shí),停止SparkContext和StreamingContext

if (outputCount == 600) {

println("Finished writing word counts to files for the last time.")

ssc.stop(stopSparkContext = true, stopGracefully = true)

}

}

ssc.start()

ssc.awaitTermination()

}

}

cd ./wordcount2

vim simple.sbt

/*simple.sbt*/

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "3.5.1",

"org.apache.spark" %% "spark-sql" % "3.5.1",

"org.apache.spark" %% "spark-streaming" % "3.5.1" // 添加spark-streaming依賴

)

/usr/local/sbt-1.9.0/sbt/sbt package

/usr/local/spark-3.5.1/bin/spark-submit --class "WordCountWithFileOutput" --driver-java-options "-Dfile.encoding=UTF-8" ~/wordcount2/target/scala-2.12/simple-project_2.12-1.0.jar

五、

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)分析與內(nèi)存計(jì)算學(xué)習(xí)筆記

http://yzkb.51969.com/

推薦閱讀

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/18881528.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄