Spark 笔记|教程 —— Python API

Spark 简介

简单地列一下自己觉得比较重点的,具体可以参考Spark文档或相关教程。

Spark是基于内存进行计算,比如进行数据计算的时候,产生的中间数据会存放在内存中,不需要经过硬盘的写入写出过程。这也就是Spark比Hadoop快很多的原因。

Spark提供了Python API,所以可以用Python去写Spark程序。而且Spark和Hadoop,kafka等其他大数据工具整合也很好。

Spark有许多紧密集成的组件(库),如下图:

Spark本身(Spark core)有任务调度,内存管理,容错机制等基本功能。内部定义了RDDs(Resilient Distributed Dataset, 弹性分布式数据集)并提供很多相应的APIs来创建和操作这些RDDs,为其他组件提供底层服务, RDD的immutability(产生之后不可更改)避免了一次性多线程更新所带来的潜在问题。若果某个node崩溃了,spark会从输入数据中找到相应的RDD并且从崩溃的地方恢复(fault tolerant)。

SparkSQL: 处理结构化数据的库,企业应用:报表统计

Spark Streaming: 实时数据流处理,企业应用:从Kafka等接收数据进行实时统计

MLib: 包含通用的机器学习功能的库。可以在集群(多台机器)上应用

Graphx: 图处理库,并进行图的并行计算。它提供了各种图的操作,和常用的图算法,如PageRank.

Cluster Manager: 集群管理(master and slave architecture),Spark自带的集群管理单独调度器(standalone),常见的还有Hadoop YARN, Apache Mesos.

DAG:Direct Acyclic Graph, 当运行一个应用(application)的时候,它会自动生成一个由nodes和edges构成的图(即方向性的非循环的图),显示了计算的顺序,即edge的方向。(sequence of computation/execution flow, which will be mapped to RDD)

Driver program: e.g. Spark shell,还可以是IPython,Jupyter等,后文会看到。driver program的一次执行叫做一个job,job被cluster master分成不同的task给不同的worker node(executor)。计算结果返回给driver。并且Driver负责把一个application或者程序转换成DAG(即是任务分配的一种抽象)。

SparkContext (sc): It represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster, e.g. what you Python application/script creates to communicate and coordinate with worker nodes. 由Driver program产生SparkContext,并通过SparkContext连结spark cluster。

Pyspark shell

以下操作是在terminal的spark根目录下,如我这里是在文件夹spark2.2里面

cd bin/
./pyspark
exit();

exit(); 用于退出Python shell

在上面的操作过程中,会发现有很多INFO日志弹出。通过修改spark文件夹中

cd conf/
cp log4j.properties.template  log4j.properties
vi log4j.properties

把原来的INFO改成WARN

log4j.rootCategory=WARN, console

然后按Esc, :wq 保存退出,其他具体的vi命令可以参考我的另外一篇文章 常用vi命令总结

改变pyspark默认选择的Python版本

cd conf/
cp spark-env.sh.template  spark-env.sh
vi spark-env.sh

添加如下命令到打开的文件中(放在第二行就可以),

export PYSPARK_PYTHON=/usr/local/bin/python3.5
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3.5

然后按Esc, :wq 保存退出。重新运行pyspark,看到Python版本已经改变了。

我这里使用的是Mac系统,如果是Linux需要改变下路径。还有你要改变的Python版本要根据你电脑中安装的来看,我这里是Python3.5。

如果想使用jupyter可以在spark-env.sh文件中加入下面语句,删掉之前的PYSPARK_DRIVER_PYTHON

export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

之后每次启动pyspark就会自动从浏览器中打开jupyter。

除了这个,还有另外一种方法

重新打开terminal 或者 cd + 空格,然后运行命令

nano .bash_profile
在.bash_profile中,添加如下语句
export SPARK_HOME=/usr/local/spark2.2
export PYSPARK_PYTHON="python3"
export PYSPARK_DRIVER_PYTHON="jupyter" 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook" 
alias snotebook='$SPARK_HOME/bin/pyspark --master local[2]'
export PATH="$SPARK_HOME:$PATH"
control + o保存,按回车保存文件名,control + x 退出,然后重启terminal或者用
source .bash_profile
更新.bash_profile文件使之生效,然后再在terminal中运行下面命令就可以打开jupyter了
snotebook
.bash_profile中添加语句的倒数第二行是占用多少个电脑的核心,这里是2个,根据自身电脑的情况来。有一点需要注意的是:此时Python的工作路径变了,不是原来的pyspark的路径了,比如对于Spark官网上的例子

>>> textFile = sc.textFile("README.md")

需要更改README.md的路径,可以用全路径,比如我这里可以改成

>>> textFile = sc.textFile("/usr/local/spark2.2/README.md")

后面就可以进行具体的Python操作了。


补充一个更加灵活的办法使用IPython或jupyter,也是spark官网上使用的办法。即在spark目录下 运行下列命令:

PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
或者
PYSPARK_DRIVER_PYTHON=jupyter 
PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
如过想使用多个CPU核心,把上面语句改成,如
PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark --master local[2]


如果不想改变Pyspark shell的配置,而又想使用jupyter交互的操作spark的话,还有另外一种方法。因为jupyter可以改变它的kernel,所以我们可以自己写一个pyspark kernel,之后只要在就jupyter里面换下kernel就进入了pyspark的环境。因为我用的是anaconda,所以我的kernel路径为/anaconda3/share/jupyter/kernels,在这个路径内添加pyspark/kernel.json,即在pyspark文件夹下创建json文件,然后在json文件中添加下面内容,保存。

{
    "argv": [
        "/anaconda3/bin/python",
        "-m",
        "ipykernel_launcher",
        "-f",
        "{connection_file}"
    ],
    "display_name": "PySpark",
    "language": "python",
    "env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/usr/local/spark2.2/"
    }
}

启动jupyter,在下图位置,切换kernel

注意:1. 我并没有尝试非anaconda安装的jupyter,所以不知道这种情况下该方法是否可行,而且要注意非anaconda安装的jupyter存储kernel的路径可能不同。2. 还有一点是上面json文件中的SPARK_HOME变量是要在~/.bash_profile中提前设定好的。

登录注册后参与评论