DataFrame是一种是不可变的分布式数据集,类似于关系数据库中的表。通过在分布式数据集上施加结构,使得用户能够利用Spark SQL或者使用Spark表达式方法来查询结构化的数据(而不是使用lambda表达式)。通过构建数据,使得catalyst优化器显著提高Spark的查询性能。
创建DataFrame
除了可以将本地文件系统、分布式文件系统或者云存储系统导入DataFrame数据集,也可以直接生成DataFrame数据集,或者利用Databricks社区版中现成的数据源。
1 | # 生成json数据 |
创建一个DataFrame
1 | swimmerJSON = spark.read.json(stringJSONRDD) |
创建一个临时表
1 | swimmerJSON.createOrReplaceTempView("swimmerJSON") |
DataFrame查询
API查询
1 | swimmerJSON.show() |
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
SQL查询
1 | spark.sql("select * from swimmerJSON").collect() |
[Row(age=19, eyeColor='brown', id='123', name='Katie'),
Row(age=22, eyeColor='green', id='234', name='Michael'),
Row(age=23, eyeColor='blue', id='345', name='Simone')]
RDD的交互操作
使用反射来推断模式
通过查看JSON数据自动决定模式。
1 | swimmerJSON.printSchema() |
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
编程指定模式
通过这种方式,可以对模式有更加精细的控制,例如,将id指定为long类型。
1 | from pyspark.sql.types import * |
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
DataFrame API查询
行数
1 | swimmers.count() |
3
筛选
1 | swimmers.select("id", "age").filter("age = 22").show() |
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
1 | swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age==22).show() |
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
SQL查询
行数
1 | spark.sql("select count(1) from swimmers").show() |
+--------+
|count(1)|
+--------+
| 3|
+--------+
利用\(where\)子句筛选
1 | spark.sql("select id, age from swimmers where age=22").show() |
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
1 | spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show() |
+------+--------+
| name|eyeColor|
+------+--------+
| Katie| brown|
|Simone| blue|
+------+--------+
DataFrame应用:实时飞行性能
1 | flightPerfFilePath = "../LearningPySpark_Code/Chapter02/flight-data/departuredelays.csv" |
1 | spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show() |
+-------+------+--------+
| City|origin| Delays|
+-------+------+--------+
|Seattle| SEA|159086.0|
|Spokane| GEG| 12404.0|
| Pasco| PSC| 949.0|
+-------+------+--------+
小结
- DataFrame通过Catalyst优化器和Tungsten项目,避免了Python子进程和JVM的通信(RDD需要的)开销,从而极大的加快PySpark的查询性能
- DataFrame实际上是通用对象Spark数据集(Dataset[Row])的一个别名,Row是一个通用的非类型化JVM对象。相反,Dataset是一个强类型的JVM对象集合,是通过Scala或者Java定义的案例类决定,由于PySpark缺乏类型增强的优势,因此不支持Dataset API。但是可以通过转化为RDD或者使用UDF来访问。
参考资料
- PySpark实战指南,Drabas and Lee著, 栾云杰等译
- PySpark API Reference
- Spark SQL, DataFrames and Datasets Guide
- PySpark SQL Module: DataFrame
- PySpark SQL Functions Module