DataFrame

DataFrame是一种是不可变的分布式数据集,类似于关系数据库中的表。通过在分布式数据集上施加结构,使得用户能够利用Spark SQL或者使用Spark表达式方法来查询结构化的数据(而不是使用lambda表达式)。通过构建数据,使得catalyst优化器显著提高Spark的查询性能。

创建DataFrame

除了可以将本地文件系统、分布式文件系统或者云存储系统导入DataFrame数据集,也可以直接生成DataFrame数据集,或者利用Databricks社区版中现成的数据源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 生成json数据
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)

创建一个DataFrame

1
swimmerJSON = spark.read.json(stringJSONRDD)

创建一个临时表

1
swimmerJSON.createOrReplaceTempView("swimmerJSON")

DataFrame查询

API查询

1
swimmerJSON.show()
+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+

SQL查询

1
spark.sql("select * from swimmerJSON").collect()
[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

RDD的交互操作

使用反射来推断模式

通过查看JSON数据自动决定模式。

1
swimmerJSON.printSchema()
root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

编程指定模式

通过这种方式,可以对模式有更加精细的控制,例如,将id指定为long类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.sql.types import *
stringCSVRDD = sc.parallelize([
(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')
])

schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])

swimmers = spark.createDataFrame(stringCSVRDD, schema)

swimmers.createOrReplaceTempView("swimmers")

swimmers.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)

DataFrame API查询

行数

1
swimmers.count()
3

筛选

1
swimmers.select("id", "age").filter("age = 22").show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
1
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age==22).show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+

SQL查询

行数

1
spark.sql("select count(1) from swimmers").show()
+--------+
|count(1)|
+--------+
|       3|
+--------+

利用\(where\)子句筛选

1
spark.sql("select id, age from swimmers where age=22").show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
1
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+

DataFrame应用:实时飞行性能

1
2
3
4
5
6
7
8
9
10
11
12
flightPerfFilePath = "../LearningPySpark_Code/Chapter02/flight-data/departuredelays.csv"
airportsFilePath = "../LearningPySpark_Code/Chapter02/flight-data/airport-codes-na.txt"

# table分隔符`
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# 逗号分隔符
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

flightPerf.cache()
1
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()
+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+

小结

  1. DataFrame通过Catalyst优化器和Tungsten项目,避免了Python子进程和JVM的通信(RDD需要的)开销,从而极大的加快PySpark的查询性能
  2. DataFrame实际上是通用对象Spark数据集(Dataset[Row])的一个别名,Row是一个通用的非类型化JVM对象。相反,Dataset是一个强类型的JVM对象集合,是通过Scala或者Java定义的案例类决定,由于PySpark缺乏类型增强的优势,因此不支持Dataset API。但是可以通过转化为RDD或者使用UDF来访问。

参考资料