RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

SparkSQL編程基本概念和基本用法

數(shù)據(jù)分析與開發(fā) ? 來源:算法美食屋 ? 作者:梁云1991 ? 2021-11-02 15:45 ? 次閱讀

本節(jié)將介紹SparkSQL編程基本概念和基本用法。

不同于RDD編程的命令式編程范式,SparkSQL編程是一種聲明式編程范式,我們可以通過SQL語句或者調(diào)用DataFrame的相關(guān)API描述我們想要實(shí)現(xiàn)的操作。

然后Spark會(huì)將我們的描述進(jìn)行語法解析,找到相應(yīng)的執(zhí)行計(jì)劃并對(duì)其進(jìn)行流程優(yōu)化,然后調(diào)用相應(yīng)基礎(chǔ)命令進(jìn)行執(zhí)行。

我們使用pyspark進(jìn)行RDD編程時(shí),在Excutor上跑的很多時(shí)候就是Python代碼,當(dāng)然,少數(shù)時(shí)候也會(huì)跑java字節(jié)碼。

但我們使用pyspark進(jìn)行SparkSQL編程時(shí),在Excutor上跑的全部是java字節(jié)碼,pyspark在Driver端就將相應(yīng)的Python代碼轉(zhuǎn)換成了java任務(wù)然后放到Excutor上執(zhí)行。

因此,使用SparkSQL的編程范式進(jìn)行編程,我們能夠取得幾乎和直接使用scala/java進(jìn)行編程相當(dāng)?shù)男?忽略語法解析時(shí)間差異)。此外SparkSQL提供了非常方便的數(shù)據(jù)讀寫API,我們可以用它和Hive表,HDFS,mysql表,Cassandra,Hbase等各種存儲(chǔ)媒介進(jìn)行數(shù)據(jù)交換。

美中不足的是,SparkSQL的靈活性會(huì)稍差一些,其默認(rèn)支持的數(shù)據(jù)類型通常只有 Int,Long,Float,Double,String,Boolean 等這些標(biāo)準(zhǔn)SQL數(shù)據(jù)類型, 類型擴(kuò)展相對(duì)繁瑣。對(duì)于一些較為SQL中不直接支持的功能,通常可以借助于用戶自定義函數(shù)(UDF)來實(shí)現(xiàn),如果功能更加復(fù)雜,則可以轉(zhuǎn)成RDD來進(jìn)行實(shí)現(xiàn)。

本節(jié)我們將主要介紹以下主要內(nèi)容:

  • RDD和DataFrame的對(duì)比

  • 創(chuàng)建DataFrame

  • DataFrame保存成文件

  • DataFrame的API交互

  • DataFrame的SQL交互

importfindspark

#指定spark_home為剛才的解壓路徑,指定python路徑
spark_home="/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path="/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)

importpyspark
frompyspark.sqlimportSparkSession

#SparkSQL的許多功能封裝在SparkSession的方法接口

spark=SparkSession.builder
.appName("test")
.config("master","local[4]")
.enableHiveSupport()
.getOrCreate()

sc=spark.sparkContext


一,RDD,DataFrame和DataSet對(duì)比

DataFrame參照了Pandas的思想,在RDD基礎(chǔ)上增加了schma,能夠獲取列名信息

DataSet在DataFrame基礎(chǔ)上進(jìn)一步增加了數(shù)據(jù)類型信息,可以在編譯時(shí)發(fā)現(xiàn)類型錯(cuò)誤。

DataFrame可以看成DataSet[Row],兩者的API接口完全相同。

DataFrame和DataSet都支持SQL交互式查詢,可以和 Hive無縫銜接。

DataSet只有Scala語言和Java語言接口中才支持,在Python和R語言接口只支持DataFrame。

DataFrame數(shù)據(jù)結(jié)構(gòu)本質(zhì)上是通過RDD來實(shí)現(xiàn)的,但是RDD是一種行存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu),而DataFrame是一種列存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)。

二,創(chuàng)建DataFrame

1,通過toDF方法轉(zhuǎn)換成DataFrame

可以將RDD用toDF方法轉(zhuǎn)換成DataFrame

#將RDD轉(zhuǎn)換成DataFrame
rdd=sc.parallelize([("LiLei",15,88),("HanMeiMei",16,90),("DaChui",17,60)])
df=rdd.toDF(["name","age","score"])
df.show()
df.printSchema()
+---------+---+-----+
|name|age|score|
+---------+---+-----+
|LiLei|15|88|
|HanMeiMei|16|90|
|DaChui|17|60|
+---------+---+-----+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--score:long(nullable=true)

2, 通過createDataFrame方法將Pandas.DataFrame轉(zhuǎn)換成pyspark中的DataFrame

importpandasaspd

pdf=pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns=["name","age"])
df=spark.createDataFrame(pdf)
df.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|18|
|HanMeiMei|17|
+---------+---+
#也可以對(duì)列表直接轉(zhuǎn)換
values=[("LiLei",18),("HanMeiMei",17)]
df=spark.createDataFrame(values,["name","age"])
df.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|18|
|HanMeiMei|17|
+---------+---+

4, 通過createDataFrame方法指定schema動(dòng)態(tài)創(chuàng)建DataFrame

可以通過createDataFrame的方法指定rdd和schema創(chuàng)建DataFrame。

這種方法比較繁瑣,但是可以在預(yù)先不知道schema和數(shù)據(jù)類型的情況下在代碼中動(dòng)態(tài)創(chuàng)建DataFrame.

frompyspark.sql.typesimport*
frompyspark.sqlimportRow
fromdatetimeimportdatetime

schema=StructType([StructField("name",StringType(),nullable=False),
StructField("score",IntegerType(),nullable=True),
StructField("birthday",DateType(),nullable=True)])

rdd=sc.parallelize([Row("LiLei",87,datetime(2010,1,5)),
Row("HanMeiMei",90,datetime(2009,3,1)),
Row("DaChui",None,datetime(2008,7,2))])

dfstudent=spark.createDataFrame(rdd,schema)

dfstudent.show()
+---------+-----+----------+
|name|score|birthday|
+---------+-----+----------+
|LiLei|87|2010-01-05|
|HanMeiMei|90|2009-03-01|
|DaChui|null|2008-07-02|
+---------+-----+----------+

4,通過讀取文件創(chuàng)建

可以讀取json文件,csv文件,hive數(shù)據(jù)表或者mysql數(shù)據(jù)表得到DataFrame。

#讀取json文件生成DataFrame
df=spark.read.json("data/people.json")
df.show()
+----+-------+
|age|name|
+----+-------+
|null|Michael|
|30|Andy|
|19|Justin|
+----+-------+
#讀取csv文件
df=spark.read.option("header","true")
.option("inferSchema","true")
.option("delimiter",",")
.csv("data/iris.csv")
df.show(5)
df.printSchema()
+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|5.1|3.5|1.4|0.2|0|
|4.9|3.0|1.4|0.2|0|
|4.7|3.2|1.3|0.2|0|
|4.6|3.1|1.5|0.2|0|
|5.0|3.6|1.4|0.2|0|
+-----------+----------+-----------+----------+-----+
onlyshowingtop5rows

root
|--sepallength:double(nullable=true)
|--sepalwidth:double(nullable=true)
|--petallength:double(nullable=true)
|--petalwidth:double(nullable=true)
|--label:integer(nullable=true)
#讀取csv文件
df=spark.read.format("com.databricks.spark.csv")
.option("header","true")
.option("inferSchema","true")
.option("delimiter",",")
.load("data/iris.csv")
df.show(5)
df.printSchema()
+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|5.1|3.5|1.4|0.2|0|
|4.9|3.0|1.4|0.2|0|
|4.7|3.2|1.3|0.2|0|
|4.6|3.1|1.5|0.2|0|
|5.0|3.6|1.4|0.2|0|
+-----------+----------+-----------+----------+-----+
onlyshowingtop5rows

root
|--sepallength:double(nullable=true)
|--sepalwidth:double(nullable=true)
|--petallength:double(nullable=true)
|--petalwidth:double(nullable=true)
|--label:integer(nullable=true)
#讀取parquet文件
df=spark.read.parquet("data/users.parquet")
df.show()
+------+--------------+----------------+
|name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|null|[3,9,15,20]|
|Ben|red|[]|
+------+--------------+----------------+

#讀取hive數(shù)據(jù)表生成DataFrame

spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")
spark.sql("LOADDATALOCALINPATH'data/kv1.txt'INTOTABLEsrc")
df=spark.sql("SELECTkey,valueFROMsrcWHEREkey)
df.show(5)

+---+-----+
|key|value|
+---+-----+
|0|val_0|
|0|val_0|
|0|val_0|
|0|val_0|
|0|val_0|
+---+-----+
onlyshowingtop5rows
#讀取mysql數(shù)據(jù)表生成DataFrame
"""
url="jdbc//localhost:3306/test"
df=spark.read.format("jdbc")
.option("url",url)
.option("dbtable","runoob_tbl")
.option("user","root")
.option("password","0845")
.load()
df.show()
"""

三,DataFrame保存成文件

可以保存成csv文件,json文件,parquet文件或者保存成hive數(shù)據(jù)表

#保存成csv文件
df=spark.read.format("json").load("data/people.json")
df.write.format("csv").option("header","true").save("data/people_write.csv")
#先轉(zhuǎn)換成rdd再保存成txt文件
df.rdd.saveAsTextFile("data/people_rdd.txt")
#保存成json文件
df.write.json("data/people_write.json")
#保存成parquet文件,壓縮格式,占用存儲(chǔ)小,且是spark內(nèi)存中存儲(chǔ)格式,加載最快
df.write.partitionBy("age").format("parquet").save("data/namesAndAges.parquet")
df.write.parquet("data/people_write.parquet")
#保存成hive數(shù)據(jù)表
df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")

四,DataFrame的API交互

frompyspark.sqlimportRow
frompyspark.sql.functionsimport*

df=spark.createDataFrame(
[("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male")]).toDF("name","age","gender")

df.show()
df.printSchema()

+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--gender:string(nullable=true)

1,Action操作

DataFrame的Action操作包括show,count,collect,,describe,take,head,first等操作。

#show
df.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+
#show(numRows:Int,truncate:Boolean)
#第二個(gè)參數(shù)設(shè)置是否當(dāng)輸出字段長(zhǎng)度超過20時(shí)進(jìn)行截取
df.show(2,False)
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
+---------+---+------+
onlyshowingtop2rows
#count
df.count()
3
#collect
df.collect()
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female'),
Row(name='DaChui',age=17,gender='male')]
#first
df.first()
Row(name='LiLei',age=15,gender='male')
#take
df.take(2)
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female')]
#head
df.head(2)
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female')]

2,類RDD操作

DataFrame支持RDD中一些諸如distinct,cache,sample,foreach,intersect,except等操作。

可以把DataFrame當(dāng)做數(shù)據(jù)類型為Row的RDD來進(jìn)行操作,必要時(shí)可以將其轉(zhuǎn)換成RDD來操作。

df=spark.createDataFrame([("HelloWorld",),("HelloChina",),("HelloSpark",)]).toDF("value")
df.show()
+-----------+
|value|
+-----------+
|HelloWorld|
|HelloChina|
|HelloSpark|
+-----------+
#map操作,需要先轉(zhuǎn)換成rdd
rdd=df.rdd.map(lambdax:Row(x[0].upper()))
dfmap=rdd.toDF(["value"]).show()
+-----------+
|value|
+-----------+
|HELLOWORLD|
|HELLOCHINA|
|HELLOSPARK|
+-----------+
#flatMap,需要先轉(zhuǎn)換成rdd
df_flat=df.rdd.flatMap(lambdax:x[0].split("")).map(lambdax:Row(x)).toDF(["value"])
df_flat.show()
+-----+
|value|
+-----+
|Hello|
|World|
|Hello|
|China|
|Hello|
|Spark|
+-----+
#filter過濾
df_filter=df.rdd.filter(lambdas:s[0].endswith("Spark")).toDF(["value"])

df_filter.show()
+-----------+
|value|
+-----------+
|HelloSpark|
+-----------+
#filter和broadcast混合使用
broads=sc.broadcast(["Hello","World"])

df_filter_broad=df_flat.filter(~col("value").isin(broads.value))

df_filter_broad.show()
+-----+
|value|
+-----+
|China|
|Spark|
+-----+
#distinct
df_distinct=df_flat.distinct()
df_distinct.show()

+-----+
|value|
+-----+
|World|
|China|
|Hello|
|Spark|
+-----+
#cache緩存
df.cache()
df.unpersist()
#sample抽樣
dfsample=df.sample(False,0.6,0)

dfsample.show()
+-----------+
|value|
+-----------+
|HelloChina|
|HelloSpark|
+-----------+
df2=spark.createDataFrame([["HelloWorld"],["HelloScala"],["HelloSpark"]]).toDF("value")
df2.show()
+-----------+
|value|
+-----------+
|HelloWorld|
|HelloScala|
|HelloSpark|
+-----------+
#intersect交集
dfintersect=df.intersect(df2)

dfintersect.show()
+-----------+
|value|
+-----------+
|HelloSpark|
|HelloWorld|
+-----------+
#exceptAll補(bǔ)集

dfexcept=df.exceptAll(df2)
dfexcept.show()

+-----------+
|value|
+-----------+
|HelloChina|
+-----------+

3,類Excel操作

可以對(duì)DataFrame進(jìn)行增加列,刪除列,重命名列,排序等操作,去除重復(fù)行,去除空行,就跟操作Excel表格一樣。

df=spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")

df.show()
df.printSchema()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--gender:string(nullable=true)
#增加列
dfnew=df.withColumn("birthyear",-df["age"]+2020)

dfnew.show()
+---------+---+------+---------+
|name|age|gender|birthyear|
+---------+---+------+---------+
|LiLei|15|male|2005|
|HanMeiMei|16|female|2004|
|DaChui|17|male|2003|
|RuHua|16|null|2004|
+---------+---+------+---------+
#置換列的順序
dfupdate=dfnew.select("name","age","birthyear","gender")
dfupdate.show()
#刪除列
dfdrop=df.drop("gender")
dfdrop.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|15|
|HanMeiMei|16|
|DaChui|17|
|RuHua|16|
+---------+---+
#重命名列
dfrename=df.withColumnRenamed("gender","sex")
dfrename.show()
+---------+---+------+
|name|age|sex|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

#排序sort,可以指定升序降序
dfsorted=df.sort(df["age"].desc())
dfsorted.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|RuHua|16|null|
|HanMeiMei|16|female|
|LiLei|15|male|
+---------+---+------+
#排序orderby,默認(rèn)為升序,可以根據(jù)多個(gè)字段
dfordered=df.orderBy(df["age"].desc(),df["gender"].desc())

dfordered.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|HanMeiMei|16|female|
|RuHua|16|null|
|LiLei|15|male|
+---------+---+------+
#去除nan值行
dfnotnan=df.na.drop()

dfnotnan.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+
#填充nan值
df_fill=df.na.fill("female")
df_fill.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|female|
+---------+---+------+
#替換某些值
df_replace=df.na.replace({"":"female","RuHua":"SiYu"})
df_replace.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|SiYu|16|null|
+---------+---+------+
#去重,默認(rèn)根據(jù)全部字段
df2=df.unionAll(df)
df2.show()
dfunique=df2.dropDuplicates()
dfunique.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

+---------+---+------+
|name|age|gender|
+---------+---+------+
|RuHua|16|null|
|DaChui|17|male|
|HanMeiMei|16|female|
|LiLei|15|male|
+---------+---+------+
#去重,根據(jù)部分字段
dfunique_part=df.dropDuplicates(["age"])
dfunique_part.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|LiLei|15|male|
|HanMeiMei|16|female|
+---------+---+------+

#簡(jiǎn)單聚合操作
dfagg=df.agg({"name":"count","age":"max"})

dfagg.show()
+-----------+--------+
|count(name)|max(age)|
+-----------+--------+
|4|17|
+-----------+--------+

#匯總信息
df_desc=df.describe()
df_desc.show()
+-------+------+-----------------+------+
|summary|name|age|gender|
+-------+------+-----------------+------+
|count|4|4|3|
|mean|null|16.0|null|
|stddev|null|0.816496580927726|null|
|min|DaChui|15|female|
|max|RuHua|17|male|
+-------+------+-----------------+------+
#頻率超過0.5的年齡和性別
df_freq=df.stat.freqItems(("age","gender"),0.5)

df_freq.show()
+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|[16]|[male]|
+-------------+----------------+

		

4,類SQL表操作

類SQL表操作主要包括表查詢(select,selectExpr,where),表連接(join,union,unionAll),表分組(groupby,agg,pivot)等操作。

df=spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)]).toDF("name","age","gender")

df.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+
#表查詢select
dftest=df.select("name").limit(2)
dftest.show()
+---------+
|name|
+---------+
|LiLei|
|HanMeiMei|
+---------+
dftest=df.select("name",df["age"]+1)
dftest.show()
+---------+---------+
|name|(age+1)|
+---------+---------+
|LiLei|16|
|HanMeiMei|17|
|DaChui|18|
|RuHua|17|
+---------+---------+
#表查詢select
dftest=df.select("name",-df["age"]+2020).toDF("name","birth_year")
dftest.show()
+---------+----------+
|name|birth_year|
+---------+----------+
|LiLei|2005|
|HanMeiMei|2004|
|DaChui|2003|
|RuHua|2004|
+---------+----------+
#表查詢selectExpr,可以使用UDF函數(shù),指定別名等
importdatetime
spark.udf.register("getBirthYear",lambdaage:datetime.datetime.now().year-age)
dftest=df.selectExpr("name","getBirthYear(age)asbirth_year","UPPER(gender)asgender")
dftest.show()
+---------+----------+------+
|name|birth_year|gender|
+---------+----------+------+
|LiLei|2005|MALE|
|HanMeiMei|2004|FEMALE|
|DaChui|2003|MALE|
|RuHua|2004|null|
+---------+----------+------+
#表查詢where,指定SQL中的where字句表達(dá)式
dftest=df.where("gender='male'andage>15")
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|DaChui|17|male|
+------+---+------+
#表查詢filter
dftest=df.filter(df["age"]>16)
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|DaChui|17|male|
+------+---+------+
#表查詢filter
dftest=df.filter("gender='male'")
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|LiLei|15|male|
|DaChui|17|male|
+------+---+------+
#表連接join
dfscore=spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)])
.toDF("name","gender","score")

dfscore.show()
+---------+------+-----+
|name|gender|score|
+---------+------+-----+
|LiLei|male|88|
|HanMeiMei|female|90|
|DaChui|male|50|
+---------+------+-----+
#表連接join,根據(jù)單個(gè)字段
dfjoin=df.join(dfscore.select("name","score"),"name")
dfjoin.show()
+---------+---+------+-----+
|name|age|gender|score|
+---------+---+------+-----+
|LiLei|15|male|88|
|HanMeiMei|16|female|90|
|DaChui|17|male|50|
+---------+---+------+-----+
#表連接join,根據(jù)多個(gè)字段
dfjoin=df.join(dfscore,["name","gender"])
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
+---------+------+---+-----+
#表連接join,根據(jù)多個(gè)字段
#可以指定連接方式為"inner","left","right","outer","semi","full","leftanti","anti"等多種方式
dfjoin=df.join(dfscore,["name","gender"],"right")
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
+---------+------+---+-----+

dfjoin=df.join(dfscore,["name","gender"],"outer")
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
|RuHua|null|16|null|
+---------+------+---+-----+
#表連接,靈活指定連接關(guān)系
dfmark=dfscore.withColumnRenamed("gender","sex")
dfmark.show()
+---------+------+-----+
|name|sex|score|
+---------+------+-----+
|LiLei|male|88|
|HanMeiMei|female|90|
|DaChui|male|50|
+---------+------+-----+

dfjoin=df.join(dfmark,(df["name"]==dfmark["name"])&(df["gender"]==dfmark["sex"]),
"inner")
dfjoin.show()
+---------+---+------+---------+------+-----+
|name|age|gender|name|sex|score|
+---------+---+------+---------+------+-----+
|HanMeiMei|16|female|HanMeiMei|female|90|
|DaChui|17|male|DaChui|male|50|
|LiLei|15|male|LiLei|male|88|
+---------+---+------+---------+------+-----+

#表合并union
dfstudent=spark.createDataFrame([("Jim",18,"male"),("Lily",16,"female")]).toDF("name","age","gender")
dfstudent.show()
+----+---+------+
|name|age|gender|
+----+---+------+
|Jim|18|male|
|Lily|16|female|
+----+---+------+
dfunion=df.union(dfstudent)
dfunion.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
|Jim|18|male|
|Lily|16|female|
+---------+---+------+
#表分組groupBy
frompyspark.sqlimportfunctionsasF
dfgroup=df.groupBy("gender").max("age")
dfgroup.show()
+------+--------+
|gender|max(age)|
+------+--------+
|null|16|
|female|16|
|male|17|
+------+--------+
#表分組后聚合,groupBy,agg
dfagg=df.groupBy("gender").agg(F.mean("age").alias("mean_age"),
F.collect_list("name").alias("names"))
dfagg.show()
+------+--------+---------------+
|gender|mean_age|names|
+------+--------+---------------+
|null|16.0|[RuHua]|
|female|16.0|[HanMeiMei]|
|male|16.0|[LiLei,DaChui]|
+------+--------+---------------+

#表分組聚合,groupBy,agg
dfagg=df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)"))
dfagg.show()

+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
|null|16.0|[RuHua]|
|female|16.0|[HanMeiMei]|
|male|16.0|[LiLei,DaChui]|
+------+--------+------------------+

#表分組聚合,groupBy,agg
df.groupBy("gender","age").agg(F.collect_list(col("name"))).show()
+------+---+------------------+
|gender|age|collect_list(name)|
+------+---+------------------+
|male|15|[LiLei]|
|male|17|[DaChui]|
|female|16|[HanMeiMei]|
|null|16|[RuHua]|
+------+---+------------------+

#表分組后透視,groupBy,pivot
dfstudent=spark.createDataFrame([("LiLei",18,"male",1),("HanMeiMei",16,"female",1),
("Jim",17,"male",2),("DaChui",20,"male",2)]).toDF("name","age","gender","class")
dfstudent.show()
dfstudent.groupBy("class").pivot("gender").max("age").show()
+---------+---+------+-----+
|name|age|gender|class|
+---------+---+------+-----+
|LiLei|18|male|1|
|HanMeiMei|16|female|1|
|Jim|17|male|2|
|DaChui|20|male|2|
+---------+---+------+-----+

+-----+------+----+
|class|female|male|
+-----+------+----+
|1|16|18|
|2|null|20|
+-----+------+----+
#窗口函數(shù)

df=spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
("DaChui",65,"class2"),("RuHua",55,"class2")])
.toDF("name","score","class")

df.show()
dforder=df.selectExpr("name","score","class",
"row_number()over(partitionbyclassorderbyscoredesc)asorder")

dforder.show()
+---------+-----+------+
|name|score|class|
+---------+-----+------+
|LiLei|78|class1|
|HanMeiMei|87|class1|
|DaChui|65|class2|
|RuHua|55|class2|
+---------+-----+------+

+---------+-----+------+-----+
|name|score|class|order|
+---------+-----+------+-----+
|DaChui|65|class2|1|
|RuHua|55|class2|2|
|HanMeiMei|87|class1|1|
|LiLei|78|class1|2|
+---------+-----+------+-----+

		

六,DataFrame的SQL交互

將DataFrame注冊(cè)為臨時(shí)表視圖或者全局表視圖后,可以使用sql語句對(duì)DataFrame進(jìn)行交互。

不僅如此,還可以通過SparkSQL對(duì)Hive表直接進(jìn)行增刪改查等操作。

1,注冊(cè)視圖后進(jìn)行SQL交互

#注冊(cè)為臨時(shí)表視圖,其生命周期和SparkSession相關(guān)聯(lián)
df=spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
("name","age","gender"))

df.show()
df.createOrReplaceTempView("student")
dfmale=spark.sql("select*fromstudentwheregender='male'")
dfmale.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|18|male|
|HanMeiMei|17|female|
|Jim|16|male|
+---------+---+------+

+-----+---+------+
|name|age|gender|
+-----+---+------+
|LiLei|18|male|
|Jim|16|male|
+-----+---+------+
#注冊(cè)為全局臨時(shí)表視圖,其生命周期和整個(gè)Spark應(yīng)用程序關(guān)聯(lián)

df.createOrReplaceGlobalTempView("student")
query="""
selectt.gender
,collect_list(t.name)asnames
fromglobal_temp.studentt
groupbyt.gender
""".strip("
")

spark.sql(query).show()
#可以在新的Session中訪問
spark.newSession().sql("select*fromglobal_temp.student").show()

+------+------------+
|gender|names|
+------+------------+
|female|[HanMeiMei]|
|male|[LiLei,Jim]|
+------+------------+

+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|18|male|
|HanMeiMei|17|female|
|Jim|16|male|
+---------+---+------+

2,對(duì)Hive表進(jìn)行增刪改查操作

#刪除hive表

query="DROPTABLEIFEXISTSstudents"
spark.sql(query)

#建立hive分區(qū)表
#(注:不可以使用中文字段作為分區(qū)字段)

query="""CREATETABLEIFNOTEXISTS`students`
(`name`STRINGCOMMENT'姓名',
`age`INTCOMMENT'年齡'
)
PARTITIONEDBY(`class`STRINGCOMMENT'班級(jí)',`gender`STRINGCOMMENT'性別')
""".replace("
","")
spark.sql(query)
##動(dòng)態(tài)寫入數(shù)據(jù)到hive分區(qū)表
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")#注意此處有一個(gè)設(shè)置操作
dfstudents=spark.createDataFrame([("LiLei",18,"class1","male"),
("HanMeimei",17,"class2","female"),
("DaChui",19,"class2","male"),
("Lily",17,"class1","female")]).toDF("name","age","class","gender")
dfstudents.show()

#動(dòng)態(tài)寫入分區(qū)
dfstudents.write.mode("overwrite").format("hive")
.partitionBy("class","gender").saveAsTable("students")
#寫入到靜態(tài)分區(qū)
dfstudents=spark.createDataFrame([("Jim",18,"class3","male"),
("Tom",19,"class3","male")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass3")

#INSERTINTO尾部追加,INSERTOVERWRITETABLE覆蓋分區(qū)
query="""
INSERTOVERWRITETABLE`students`
PARTITION(class='class3',gender='male')
SELECTname,agefromdfclass3
""".replace("
","")
spark.sql(query)
#寫入到混合分區(qū)
dfstudents=spark.createDataFrame([("David",18,"class4","male"),
("Amy",17,"class4","female"),
("Jerry",19,"class4","male"),
("Ann",17,"class4","female")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass4")

query="""
INSERTOVERWRITETABLE`students`
PARTITION(class='class4',gender)
SELECTname,age,genderfromdfclass4
""".replace("
","")
spark.sql(query)
#讀取全部數(shù)據(jù)

dfdata=spark.sql("select*fromstudents")
dfdata.show()
+---------+---+------+------+
|name|age|class|gender|
+---------+---+------+------+
|Ann|17|class4|female|
|Amy|17|class4|female|
|HanMeimei|17|class2|female|
|DaChui|19|class2|male|
|LiLei|18|class1|male|
|Lily|17|class1|female|
|Jerry|19|class4|male|
|David|18|class4|male|
|Jim|18|class3|male|
|Tom|19|class3|male|
+---------+---+------+------+
責(zé)任編輯:haq
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3614

    瀏覽量

    93686
  • SQL
    SQL
    +關(guān)注

    關(guān)注

    1

    文章

    762

    瀏覽量

    44117

原文標(biāo)題:2 小時(shí)入門 SparkSQL 編程

文章出處:【微信號(hào):DBDevs,微信公眾號(hào):數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    如何快速入門HAL庫編程 HAL庫與裸機(jī)編程的比較

    如何快速入門HAL庫編程 要快速入門HAL庫編程,可以遵循以下步驟: 了解基礎(chǔ)知識(shí) : 掌握C語言編程基礎(chǔ),包括變量、數(shù)據(jù)類型、函數(shù)、指針等。 了解嵌入式系統(tǒng)的基本概念,如微控制器、中
    的頭像 發(fā)表于 12-02 11:39 ?258次閱讀

    Linux應(yīng)用編程基本概念

    Linux應(yīng)用編程涉及到在Linux環(huán)境下開發(fā)和運(yùn)行應(yīng)用程序的一系列概念。以下是一些涵蓋Linux應(yīng)用編程基本概念
    的頭像 發(fā)表于 10-24 17:19 ?222次閱讀

    X電容和Y電容的基本概念

    在電子電路中,電容器是一種至關(guān)重要的元件,它用于儲(chǔ)存電荷并在電路中釋放能量。而在眾多的電容器中,X電容和Y電容作為安規(guī)電容,因其特定的應(yīng)用場(chǎng)景和安全性能而受到廣泛關(guān)注。本文將對(duì)X電容和Y電容的基本概念、工作原理、應(yīng)用場(chǎng)景以及選擇和維護(hù)等方面進(jìn)行詳細(xì)介紹。
    的頭像 發(fā)表于 10-21 16:43 ?1748次閱讀

    集電極開路的基本概念與原理

    在集成電路的廣闊領(lǐng)域中,集電極開路作為一種獨(dú)特的輸出形式,扮演著舉足輕重的角色。它如同一個(gè)精密控制的開關(guān),既能實(shí)現(xiàn)電路的通斷,又能靈活應(yīng)對(duì)不同的電流需求。 集電極開路的基本概念與原理 集電極開路
    的頭像 發(fā)表于 09-19 10:19 ?528次閱讀
    集電極開路的<b class='flag-5'>基本概念</b>與原理

    DDR4的基本概念和特性

    里程碑。自2011年面世以來,DDR4憑借其顯著的性能提升和能效優(yōu)化,迅速成為市場(chǎng)主流。以下將從DDR4的基本概念、技術(shù)特性、性能優(yōu)勢(shì)以及市場(chǎng)應(yīng)用等方面進(jìn)行詳細(xì)闡述。
    的頭像 發(fā)表于 09-04 11:43 ?1901次閱讀

    伺服系統(tǒng)基本概念和與變頻的關(guān)系

    伺服系統(tǒng)的基本概念是準(zhǔn)確、精確、快速定位。這一概念貫穿于伺服系統(tǒng)的設(shè)計(jì)理念和運(yùn)行機(jī)制中。為了實(shí)現(xiàn)這一目標(biāo),伺服系統(tǒng)采用了多種先進(jìn)的控制策略和技術(shù)手段。其中,變頻技術(shù)是伺服控制的一個(gè)必不可少的內(nèi)部環(huán)節(jié)
    的頭像 發(fā)表于 08-27 15:59 ?385次閱讀

    socket的基本概念和原理

    的通信。它是一個(gè)抽象的概念,用于表示網(wǎng)絡(luò)中的一個(gè)通信實(shí)體。在計(jì)算機(jī)網(wǎng)絡(luò)中,Socket允許應(yīng)用程序通過網(wǎng)絡(luò)發(fā)送和接收數(shù)據(jù)。Socket的概念最早由UNIX操作系統(tǒng)引入,后來被廣泛應(yīng)用于各種操作系統(tǒng)和編程語言中。 2. Socke
    的頭像 發(fā)表于 08-16 10:51 ?1124次閱讀

    BP網(wǎng)絡(luò)的基本概念和訓(xùn)練原理

    )的多層前饋神經(jīng)網(wǎng)絡(luò)。BP網(wǎng)絡(luò)自1985年提出以來,因其強(qiáng)大的學(xué)習(xí)和適應(yīng)能力,在機(jī)器學(xué)習(xí)、數(shù)據(jù)挖掘、模式識(shí)別等領(lǐng)域得到了廣泛應(yīng)用。以下將對(duì)BP網(wǎng)絡(luò)的基本概念、訓(xùn)練原理及其優(yōu)缺點(diǎn)進(jìn)行詳細(xì)闡述。
    的頭像 發(fā)表于 07-19 17:24 ?1577次閱讀

    卷積神經(jīng)網(wǎng)絡(luò)的基本概念、原理及特點(diǎn)

    基本概念、原理、特點(diǎn)以及在不同領(lǐng)域的應(yīng)用情況。 一、卷積神經(jīng)網(wǎng)絡(luò)的基本概念 卷積神經(jīng)網(wǎng)絡(luò)是一種深度學(xué)習(xí)算法,它由多層卷積層和池化層堆疊而成。卷積層負(fù)責(zé)提取圖像中的局部特征,而池化層則負(fù)責(zé)降低特征的空間維度,同時(shí)增加對(duì)圖像位移的不變性。通過這種方式,CNN能夠自
    的頭像 發(fā)表于 07-11 14:38 ?1022次閱讀

    循環(huán)神經(jīng)網(wǎng)絡(luò)的基本概念

    循環(huán)神經(jīng)網(wǎng)絡(luò)的基本概念、循環(huán)機(jī)制、長(zhǎng)短時(shí)記憶網(wǎng)絡(luò)(LSTM)、門控循環(huán)單元(GRU)等方面進(jìn)行介紹。 循環(huán)神經(jīng)網(wǎng)絡(luò)的基本概念 循環(huán)神經(jīng)網(wǎng)絡(luò)是一種時(shí)間序列模型,其基本思想是將序列數(shù)據(jù)中的每個(gè)元素(例如,單詞、時(shí)間點(diǎn)等)作為輸入,通過循環(huán)結(jié)構(gòu)將前一個(gè)時(shí)間步的
    的頭像 發(fā)表于 07-04 14:31 ?661次閱讀

    人機(jī)界面觸摸屏編程基本概念及硬件選擇

    人機(jī)界面(Human-Machine Interface,簡(jiǎn)稱HMI)觸摸屏編程是一種廣泛應(yīng)用于工業(yè)自動(dòng)化、智能家居、醫(yī)療設(shè)備等領(lǐng)域的技術(shù)。本文將詳細(xì)介紹HMI觸摸屏編程基本概念、硬件選擇、軟件
    的頭像 發(fā)表于 07-01 14:42 ?2015次閱讀

    組合邏輯控制器的基本概念、實(shí)現(xiàn)原理及設(shè)計(jì)方法

    廣泛應(yīng)用于計(jì)算機(jī)、通信、控制等領(lǐng)域。 本文將詳細(xì)介紹組合邏輯控制器的基本概念、實(shí)現(xiàn)原理、設(shè)計(jì)方法、應(yīng)用場(chǎng)景等方面的內(nèi)容,以幫助讀者全面了解組合邏輯控制器。 基本概念 1.1 組合邏輯 組合邏輯(Combinatorial Logic)是一種數(shù)字邏輯,它根據(jù)輸入信號(hào)的當(dāng)前狀
    的頭像 發(fā)表于 06-30 10:26 ?1978次閱讀

    串口通信的基本概念

    串口通信(Serial Communications)的基本概念可以歸納為以下幾個(gè)方面:
    的頭像 發(fā)表于 06-12 09:28 ?609次閱讀
    串口通信的<b class='flag-5'>基本概念</b>

    Docker網(wǎng)絡(luò)的基本概念和原理與用法

    橋接網(wǎng)絡(luò)(Bridge Network):這是 Docker 容器默認(rèn)使用的網(wǎng)絡(luò)類型。每個(gè)獨(dú)立的容器都會(huì)連接到一個(gè)內(nèi)部網(wǎng)絡(luò)的私有網(wǎng)橋。 主機(jī)網(wǎng)絡(luò)(Host Network):在這種模式下,容器共享主機(jī)的網(wǎng)絡(luò)命名空間,直接使用主機(jī)的網(wǎng)絡(luò)接口。
    發(fā)表于 03-18 12:26 ?739次閱讀
    Docker網(wǎng)絡(luò)的<b class='flag-5'>基本概念</b>和原理與<b class='flag-5'>用法</b>

    電源路徑的基本概念

    電源路徑是指電流從電源到負(fù)載的傳輸路徑。在電路中,電源是提供電能的設(shè)備,而負(fù)載則是消耗電能的設(shè)備。電源路徑的穩(wěn)定性和效率對(duì)電路的性能和可靠性有著重要的影響。本文將介紹電源路徑的基本概念。 下圖展示了
    的頭像 發(fā)表于 01-18 15:39 ?812次閱讀
    電源路徑的<b class='flag-5'>基本概念</b>
    RM新时代网站-首页