本文以计算两点之间的直线距离为例子,介绍从模块化代码、打包成Spark应用程序到提交Spark作业的完整流程。
模块化代码
文件夹结构
本节介绍如何对代码进行模块化,该模块实现的功能是利用Haversine公式计算两点之间的距离,并实现英尺到米的转化。首先将所有代码放入目录additionalCode,在该目录中建立setup.py文件。
1 | from setuptools import setup |
其余代码放入utilities目录中,其结构为:
1 | utilities/ |
计算两点之间的距离
建立目录utilities,并建立__init__.py文件
1 | from .geoCalc import geoCalc |
文件geoCalc.py实现的是Haversine公式计算两点之间的距离
1 | import math |
距离单位转换
为了使得这个功能更加通用,也将该功能模块化,作为整个程序包的一部分。在utilities目录下建立converters目录,base.py的内容是:
1 | from abc import ABCMeta, abstractmethod |
上面是一个不能实例化的抽象类,它的目的是强制派生类实现convert方法,具体的实现在distance.py中
1 | from ..base import BaseConverter |
打包成egg文件
虽然可以使用--py-files,然后用逗号分割传递.py文件给spark-submit提交作业,但是更方便的做法是打包成一个egg或者zip文件,当setup.py文件可用时,可以直接调用additionalCode里面的内容。
1 | python setup.py bdist_egg |
运行成功后会出现三个文件夹build,dist,PySparkUtilities.egg-info。最令人感兴趣的是dist文件夹,可以看到里面的文件PySparkUtilities-0.1.dev0-py3.6.egg,则表示打包成功。
调用程序
为了在spark中使用前面打包好的程序,也就是要这个程序能够处理RDD和DataFrame数据集,有两种办法:使用内置函数处理数据;创建自定义的函数。本节介绍第二种方法,将python函数封装在.udf方法中,并定义返回值类型。
1 | import utilities.geoCalc as geo |
提交作业
使用spark-submit提交作业。为了避免前面的配置影响作业的提交,将该命令封装为一个脚本
1 | #!/bin/bash |
提交作业
1 | ./launch_spark_submit.sh \ |
小结
- 本文介绍了从程序的模块化封装、spark调用以及并行作业提交整个流程
参考资料
- PySpark实战指南,Drabas and Lee著, 栾云杰等译