打包Spark应用程序

本文以计算两点之间的直线距离为例子,介绍从模块化代码、打包成Spark应用程序到提交Spark作业的完整流程。

模块化代码

文件夹结构

本节介绍如何对代码进行模块化,该模块实现的功能是利用Haversine公式计算两点之间的距离,并实现英尺到米的转化。首先将所有代码放入目录additionalCode,在该目录中建立setup.py文件。

1
2
3
4
5
6
7
8
9
10
11
12
from setuptools import setup

setup(
name='PySparkUtilities',
version='0.1dev',
packages=['utilities', 'utilities/converters'],
license='''
Creative Commons
Attribution-Noncommercial-Share Alike license''',
long_description='''
An example of how to package code for PySpark'''
)

其余代码放入utilities目录中,其结构为:

1
2
3
4
5
6
7
utilities/
├── base.py
├── converters
│   ├── distance.py
│   └── __init__.py
├── geoCalc.py
└── __init__.py

计算两点之间的距离

建立目录utilities,并建立__init__.py文件

1
2
3
from .geoCalc import geoCalc

__all__ = ['geoCalc','converters']

文件geoCalc.py实现的是Haversine公式计算两点之间的距离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import math

class geoCalc(object):
@staticmethod
def calculateDistance(p1, p2):
'''
calculates the distance using Haversine formula
'''
R = 3959 # earth's radius in miles

# get the coordinates
lat1, lon1 = p1[0], p1[1]
lat2, lon2 = p2[0], p2[1]

# convert to radians
deltaLat_radians = math.radians(lat2-lat1)
deltaLon_radians = math.radians(lon2-lon1)

lat1_radians = math.radians(lat1)
lat2_radians = math.radians(lat2)

# apply the formula
hav = math.sin(deltaLat_radians / 2.0) * \
math.sin(deltaLat_radians / 2.0) + \
math.sin(deltaLon_radians / 2.0) * \
math.sin(deltaLon_radians / 2.0) * \
math.cos(lat1_radians) * \
math.cos(lat2_radians)

dist = 2 * R * math.asin(math.sqrt(hav))

return dist

if __name__ == '__main__':
p1 = {'address': '301 S Jackson St, Seattle, WA 98104',
'lat': 47.599200,
'long': -122.329841}

p2 = {'address': 'Thunderbird Films Inc 533, Smithe St #401, Vancouver, BC V6B 6H1, Canada',
'lat': 49.279688,
'long': -123.119190}

print(geoCalc.calculateDistance((p1['lat'], p1['long']), (p2['lat'], p2['long'])))

距离单位转换

为了使得这个功能更加通用,也将该功能模块化,作为整个程序包的一部分。在utilities目录下建立converters目录,base.py的内容是:

1
2
3
4
5
6
7
8
9
10
from abc import ABCMeta, abstractmethod

class BaseConverter(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def convert(f, t):
raise NotImplementedError

if __name__ == '__main__':
i = BaseConverter()

上面是一个不能实例化的抽象类,它的目的是强制派生类实现convert方法,具体的实现在distance.py中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from ..base import BaseConverter

class metricImperial(BaseConverter):
pass

@staticmethod
def convert(f, t):
conversionTable = {
'in': {
'mm': 25.4, 'cm': 2.54, 'm': 0.0254,
'km': 0.0000254
}, 'ft': {
'mm': 304.8, 'cm': 30.48, 'm': 0.3048,
'km': 0.0003048
}, 'yd': {
'mm': 914.4, 'cm': 91.44, 'm': 0.9144,
'km': 0.0009144
}, 'mile': {
'mm': 1609344, 'cm': 160934.4, 'm': 1609.344,
'km': 1.609344
}
}

f_val, f_unit = f.split(' ')
f_val = float(f_val)

if f_unit in conversionTable.keys():
if t in conversionTable[f_unit].keys():
conv = 1 / conversionTable[f_unit][t]
else:
raise KeyError('Key {0} not found...' \
.format(t))
elif t in conversionTable.keys():
if f_unit in conversionTable[t].keys():
conv = conversionTable[t][f_unit]
else:
raise KeyError('Key {0} not found...' \
.format(f_unit))
else:
raise KeyError('Neither {0} nor {1} key found'\
.format(t, f_unit))

return f_val / conv

if __name__ == '__main__':
f = metricImperial()
print(f.convert('10 mile', 'km')) 39,1 All

打包成egg文件

虽然可以使用--py-files,然后用逗号分割传递.py文件给spark-submit提交作业,但是更方便的做法是打包成一个egg或者zip文件,当setup.py文件可用时,可以直接调用additionalCode里面的内容。

1
python setup.py bdist_egg

运行成功后会出现三个文件夹build,dist,PySparkUtilities.egg-info。最令人感兴趣的是dist文件夹,可以看到里面的文件PySparkUtilities-0.1.dev0-py3.6.egg,则表示打包成功。

调用程序

为了在spark中使用前面打包好的程序,也就是要这个程序能够处理RDD和DataFrame数据集,有两种办法:使用内置函数处理数据;创建自定义的函数。本节介绍第二种方法,将python函数封装在.udf方法中,并定义返回值类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import utilities.geoCalc as geo
from utilities.converters import metricImperial

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func

def geoEncode(spark):
# read the data in
uber = spark.read.csv(
'uber_data_nyc_2016-06_3m_partitioned.csv',
header=True,
inferSchema=True
)\
.repartition(4) \
# .select('VendorID','tpep_pickup_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude','total_amount')

# prepare the UDFs
getDistance = func.udf(
lambda lat1, long1, lat2, long2:
geo.calculateDistance(
(lat1, long1),
(lat2, long2)
)
)

convertMiles = func.udf(lambda m:
metricImperial.convert(str(m) + ' mile', 'km'))

# create new columns
uber = uber.withColumn(
'miles',
getDistance(
func.col('pickup_latitude'),
func.col('pickup_longitude'),
func.col('dropoff_latitude'),
func.col('dropoff_longitude')
)
)

uber = uber.withColumn(
'kilometers',
convertMiles(func.col('miles')))

# print 10 rows
# uber.show(10)

# save to csv (partitioned)
uber.write.csv(
'uber_data_nyc_2016-06_new.csv',
mode='overwrite',
header=True,
compression='gzip'
)

if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName('CalculatingGeoDistances') \
.getOrCreate()

print('Session created')

try:
geoEncode(spark)

finally:
spark.stop()

提交作业

使用spark-submit提交作业。为了避免前面的配置影响作业的提交,将该命令封装为一个脚本

1
2
3
4
#!/bin/bash
unset PYSPARK_DRIVER_PYTHON
spark-submit $*
export PYSPARK_DRIVER_PYTHON=jupyter

提交作业

1
2
3
4
./launch_spark_submit.sh \
--master local[4] \
--py-files additionalCode/dist/PySparkUtilities-0.1.dev0-py3.6.egg \
calculatingGeoDistance.py

小结

  • 本文介绍了从程序的模块化封装、spark调用以及并行作业提交整个流程

参考资料

  • PySpark实战指南,Drabas and Lee著, 栾云杰等译