PySpark简介
发展背景
-
2009年 - Spark诞生,使用Scala语言开发。
-
2010年 - Spark成为Apache孵化器项目。
-
2013年 - PySpark模块首次引入,可以使用Python开发Spark应用。
-
2014年 - Spark成为Apache顶级项目。
-
2015年 - Databricks公司成立。
-
2016年 - PySpark成为Spark的官方子项目之一。
-
2017年 - PySpark开始支持DataFrame API。
-
2019年 - PySpark开始支持Python 3.7及更高版本。
-
2023年6月23日 - PySpark发布3.4.1版本。
版本特性
PySpark 3.0
-
升级至Spark 3.0 -
支持Python 3.6+ -
引入 Pandas UDF -
Dataframe API性能提升
PySpark 3.1
-
Spark 3.1版本特性 -
改进Pandas UDF -
新增StructTypeInference -
支持图运算和程序解析
PySpark 3.2
-
Spark 3.2版本特性 -
支持R language绑定 -
进一步扩展ML能力 -
改进Streaming连接性
PySpark 3.3
-
Spark 3.3版本特性 -
改进DataFrame性能 -
优化内存管理 -
加强Pipeline API
PySpark 3.4
-
Spark 3.4版本特性 -
改进数据加载性能 -
Broadcast Join优化 -
支持Apple M系列芯片
PySpark通过与Spark版本同步,不断吸收新特性,并优化性能、扩展功能,已经发展成为非常成熟可靠的Spark Python API。
使用场景
PySpark是一个非常流行和强大的Python API,主要应用场景包括:
-
大数据处理
利用Spark的内存计算和分布式引擎进行PB级数据的高效处理。
-
流式计算
基于Spark Streaming构建低延迟、高吞吐的流式应用。
-
机器学习
使用MLlib等组件进行分布式机器学习,如分类、聚类等。
-
数据分析
利于Python的生态进行交互式数据分析和可视化。
-
ETL处理
大规模数据抽取转化加载处理。
-
科学计算
与SciPy, NumPy等集成进行科学、工程计算。
-
实时数据仓库
构建实时分析数据仓库。
-
图分析
利用Spark GraphX进行复杂网路与图分析。
-
深度学习
结合TensorFlowOnSpark进行分布式深度学习。
PySpark可以应用到很多需要利用Spark做大规模数据处理和分析的场景,是构建数据密集型应用的不二之选。
环境配置
安装Spark和PySpark
-
安装JDK
Spark基于JVM,需要首先安装JDK,如OpenJDK 8。
-
下载Spark
可以在官网下载预编译的Spark安装包。也可以自己从源码构建。
-
配置环境变量
主要配置SPARK_HOME指向Spark根目录,并将bin目录加入PATH。
-
安装Python
确保已经安装了Python 3.6+版本。
-
安装PySpark
使用pip安装PySpark:
pip install pyspark
也可以直接从源码安装。
-
初始化SparkContext
可以在Python交互 shell中创建SparkContext测试:
from pyspark import SparkContext
sc = SparkContext()
初始化SparkContext
在PySpark程序中初始化SparkContext的方法:
1. 设置SparkConf
SparkConf用于配置Spark应用的运行参数:
conf = SparkConf()
conf.setAppName("appName")
conf.setMaster("spark://...")
2. 创建SparkContext
传入SparkConf来创建context:
sc = SparkContext(conf=conf)
3. 从SparkSession创建
如果是通过SparkSession启动,可以直接获取SparkContext:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
4. 重要参数
-
appName:应用程序名称 -
master:集群管理器URL -
deploy-mode:部署模式(client|cluster) -
executor-memory:executor内存大小
5. 关闭context
应用完成后关闭context:
sc.stop()
初始化SparkContext是编写PySpark应用的基础,正确配置对性能和资源调度非常重要。
RDD编程
RDD概念
在PySpark中,RDD(弹性分布式数据集)是一个核心概念,主要涵义如下:
-
RDD表示一个不可变、可分区、里面的元素可并行运算的集合。
-
RDD具有确定性、容错性和依赖性追踪等特性。
-
RDD拥有丰富的转换操作(transformation)和行动操作(action)。
-
RDD之间存在血缘关系,构成DAG有向无环图。
-
RDD支持持久化将数据缓存到内存或磁盘。
-
RDD可运行在集群节点上,进行并行、分布式计算。
-
RDD是PySpark编程模型的基础抽象。
总结起来,RDD是Spark/PySpark实现分布式计算的关键抽象,也是我们构建应用的基本工具。
RDD创建
PySpark中常见的RDD创建方式包括:
-
从集合创建
data = [1, 2, 3, 4]
rdd = sc.parallelize(data)
-
从外部存储创建
rdd = sc.textFile("/tmp/data.txt")
-
转换现有RDD
new_rdd = rdd.filter(lambda x: x > 10)
-
空RDD
rdd = sc.emptyRDD()
-
外部数据集
rdd = sc.pickleFile(file)
rdd = sc.sequenceFile(file)
-
自定义创建
通过继承RDD类并实现相应方法。
这些是PySpark中常见的RDD创建方式。转换现有RDD是构建DAG依赖的关键。
RDD转换
PySpark RDD的常见转换操作包括:
-
map:对RDD中每个元素应用函数
-
filter:过滤RDD中元素
-
flatMap:将每个元素映射到多个输出
-
groupByKey:按照键值分组
-
reduceByKey:按键聚合
-
sortByKey:按键排序
-
join:不同RDD进行join连接
-
distinct:去除重复元素
-
cartesian:笛卡尔乘积
-
sample:随机采样
-
union:合并多个RDD
-
intersection:取多个RDD的交集 这些转换操作可以链式组合,构建RDD之间的依赖关系,并进行并行计算。
掌握RDD转换是进行分布式数据处理的关键。
RDD操作
PySpark RDD常见的行动操作包括:
-
reduce:聚合RDD元素
-
collect:将RDD数据收集到本地
-
count:返回RDD元素个数
-
first:返回RDD第一个元素
-
take:返回RDD前N个元素
-
takeSample:随机抽样N个元素
-
takeOrdered:返回RDD排序后前N个元素
-
saveAsTextFile:保存为文本文件
-
saveAsSequenceFile:保存为Sequence文件
-
saveAsObjectFile:保存为序列化对象文件
-
countByKey:计数按键的频率
-
foreach:遍历RDD执行函数 这些行动操作会触发RDD的实际计算,返回结果给Driver程序或者保存到存储系统。
掌握RDD的行动操作可以让我们利用并行运算的结果。
RDD持久化
PySpark中可以通过持久化(Persistence)来缓存RDD,主要方式包括:
-
persist():默认持久化,缓存到内存
-
persist(StorageLevel.DISK_ONLY):只缓存到磁盘
-
persist(StorageLevel.MEMORY_ONLY):只缓存到内存
-
persist(StorageLevel.MEMORY_AND_DISK):既缓存到内存也缓存到磁盘
-
cache():默认持久化到内存
-
unpersist():移除持久化,释放缓存
持久化的主要目的是为了在多个行动操作中重用RDD,避免重复计算。
需要注意的是,持久化占用额外内存/磁盘资源,需要根据实际情况选择最佳策略。
例如对于一次性使用的RDD,不需要持久化。
DataFrame和Dataset
DataFrame介绍
PySpark 中 DataFrame的主要特征:
-
DataFrame是在RDD的基础上引入的高层数据抽象。
-
DataFrame类似关系型数据库中的表格,带有列名和行索引。
-
DataFrame中的数据被组织成名为Schema的列构成。
-
每一列都有名称和类型,可以看作关系模型中的“字段”。
-
使用DataFrame可以更方便地进行关系型操作,如SELECT,WHERE,GROUP BY等。
-
DataFrame建立在Spark SQL执行引擎之上,可以高效地运行SQL查询。
-
与RDD相比,DataFrame的执行效率更高,并且代码更简洁。
-
DataFrame使得Spark也支持传统的批处理流程和算法。
-
用户可以很方便地与Pandas DataFrame进行转换和互操作。
PySpark DataFrame在RDD之上提供了更高层的关系型抽象,使得数据处理和分析更加简单高效。它是Spark Core API和SQL API的桥梁。
构建DataFrame
在PySpark中,常见的构建DataFrame的方法包括:
-
从RDD转换得到
df = spark.createDataFrame(rdd)
-
从Hive表查询结果构建
df = spark.sql("SELECT * FROM hive_table")
-
从结构化数据文件读取
df = spark.read.json("/path/to/json_file")
-
从CSV、Parquet等文件读取
df = spark.read.load("users.parquet")
-
从外部数据库读取
df = spark.read
.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", table_name)
.load()
-
从字典列表创建
data = [{"a": 1}, {"a": 2}]
df = spark.createDataFrame(data)
-
从Pandas DataFrame转换
pdf = pd.DataFrame(data)
df = spark.createDataFrame(pdf)
这些方法构建的DataFrame可以用于后续的分析操作。
DataFrame转换
PySpark 中 DataFrame 的常见转换操作包括:
-
select:选择某些列
df.select("col1", "col2")
-
filter:过滤行
df.filter(df.age > 20)
-
orderBy:排序
df.orderBy("salary", ascending=False)
-
dropDuplicates:删除重复行
df.dropDuplicates(["id"])
-
union:合并DataFrame
df1.union(df2)
-
withColumn:添加列
df.withColumn("age", df.age + 10)
-
groupBy:分组聚合
df.groupBy("dep").avg("salary")
-
join:连接两个DataFrame
df1.join(df2, "id")
这些转换可以灵活组合,对DataFrame进行各种 shuffle 和计算,从而实现复杂的数据处理流程。
DataFrame操作
PySpark DataFrame的常见操作包括:
-
show:展示DataFrame内容
df.show()
-
printSchema:打印DataFrame的Schema
df.printSchema()
-
describe:查看列的统计信息
df.describe()
-
head/tail:查看首尾几行
df.head(3)
-
take:获取几行
df.take(5)
-
count:获取行数
df.count()
-
columns:获取所有的列名
df.columns
-
dtypes:查看每列的数据类型
df.dtypes
这些操作可以让我们直观了解DataFrame的结构和数据,进行必要的验证。
与RDD转换
在PySpark中,DataFrame和RDD可以进行互相转换:
DataFrame 转 RDD
rdd = df.rdd
-
将DataFrame转为对应的RDD表示
RDD 转 DataFrame
df = spark.createDataFrame(rdd)
-
从RDD创建DataFrame,需要指定schema
-
可以使用StructType定义schema
DataFrame 转 Pandas DataFrame
pdf = df.toPandas()
-
将Spark DataFrame转为Pandas DataFrame
Pandas DataFrame 转 Spark DataFrame
df = spark.createDataFrame(pdf)
-
从Pandas DataFrame创建Spark DataFrame
-
需要指定spark session环境
RDD和DataFrame都可以表示分布式数据,转换之间可以利用两者的优势进行不同的计算。
Spark SQL
Spark SQL概述
Spark SQL的主要功能和特点:
-
Spark SQL是Spark用于结构化数据(structured data)处理的模块。它提供了一个编程抽象叫Dataset,是DataFrame的扩展。
-
Spark SQL可以连接Hive metastore,直接对Hive表进行SQL查询。查询的执行是由Spark Catalyst优化器负责,可以重用已经缓存的数据。
-
Spark SQL支持标准的ANSI SQL语法,还提供了Dataframe API用于构建查询。
-
Spark SQL是 Spark 处理结构化数据的统一入口,支持多种数据源比如 Parquet, JSON, Hive等。
-
Spark SQL提供了高级的组件Spark MLlib来进行机器学习和图计算。
-
通过Catalyst优化器,Spark SQL可以智能地优化SQL查询计划。
-
Spark SQL支持流处理和批处理两种模式,基于同一套API和运行时。
Spark SQL为结构化数据处理提供了非常友好、统一的接口。它结合了SQL和程序性API的优点,可以应对交互式数据分析和企业级分析任务。
DataFrame API
-
查询语法层面
-
DataFrame API
df.select("name").filter(df["age"] > 20).orderBy("salary")
-
SQL
SELECT name FROM table WHERE age > 20 ORDER BY salary
-
查询构建过程
-
DataFrame API
df = spark.read.json("data.json")
df.createOrReplaceTempView("table")
-
SQL
CREATE TEMPORARY VIEW table USING json OPTIONS (path 'data.json')
-
查询执行过程
-
DataFrame API和SQL的执行计划生成过程相同
-
性能区别
-
DataFrame API
df = df.repartition(100) //调整分区数
-
编程环境
-
DataFrame API - 编程和脚本
-
SQL - Shell和Notebook
-
用户对象
-
DataFrame API
def my_func(col):
...
df.withColumn("newCol", my_func(df["col"]))
-
SQL
CREATE TEMP FUNCTION my_func AS ...
SELECT my_func(col) FROM table
SQL查询
PySpark中使用SQL查询的常见方式:
-
通过SparkSession执行SQL
spark.sql("SELECT * FROM table")
-
在DataFrame上调用sql方法
df.sql("SELECT col FROM table WHERE col > 10")
-
创建临时视图后查询
df.createOrReplaceTempView("table")
spark.sql("SELECT col FROM table")
-
创建全局视图后查询
df.createGlobalTempView("table")
spark.sql("SELECT col FROM global_temp.table")
-
从Hive表直接查询
spark.sql("SELECT * FROM default.my_hive_table")
-
用SQLContext查询
from pyspark.sql import SQLContext
sqlContext.sql("SELECT * FROM table")
这些方式可以混合使用,通过Spark SQL执行查询操作,并与DataFrame API结合,构建数据处理流程。
与Hive集成
PySpark与Hive的集成方式:
-
配置Hive metastore
Spark需要访问metastore信息,以连接Hive表
-
使用SparkSession初始化
spark = SparkSession.builder
.appName("app")
.config("hive.metastore.uris", metastore_uri)
.enableHiveSupport()
.getOrCreate()
-
通过SQL查询Hive表
SELECT * FROM default.my_hive_table
-
查询Hive UDF
SELECT my_hive_udf(col) FROM my_table
-
在HiveQL中查询Spark表
将Spark表注册为临时表后,Hive可以查询
-
共享数据格式
Spark SQL支持Parquet、ORC等Hive数据格式
-
访问Hive优化功能
比如Predicate Pushdown,Partition Pruning等
结合Spark SQL和Hive可以利用二者的优势,构建高效的大数据分析流程。
机器学习
MLlib工具包
PySpark MLlib的主要功能和特点:
-
MLlib是Spark提供的分布式机器学习库,包含了常见的机器学习算法。
-
MLlib包含分类、回归、聚类、协同过滤等多种算法,覆盖了机器学习的主要任务。
-
MLlib利用RDD进行数据加载和分布式计算,可以在大数据环境进行 scalable learning。
-
MLlib提供了基于DataFrame的pipeline API,可以简化机器学习流程的构建。
-
MLlib内置了特征提取、转换、维度缩减、模型评估等功能。
-
用户可以方便地自定义变量,扩展现有算法,或者实现自己的算法。
-
MLlib可以结合Spark Streaming和SQL,构建实时预测分析管道。
-
MLlib还提供了图运算和挖掘算法GraphX。
利用PySpark MLlib可以便捷地在大数据环境构建机器学习和数据挖掘应用。
分类、回归、聚类算法
PySpark MLlib中的主要分类、回归和聚类算法:
分类算法
-
LogisticRegression:逻辑回归分类 -
DecisionTree:决策树分类 -
NaiveBayes:朴素贝叶斯分类 -
SVM:支持向量机 -
RandomForestClassifier:随机森林分类
回归算法
-
LinearRegression:线性回归 -
LassoRegression:Lasso回归 -
RidgeRegression:岭回归 -
RandomForestRegressor:随机森林回归
聚类算法
-
KMeans:K-Means聚类 -
BisectingKMeans:二分K-Means聚类 -
GaussianMixture:高斯混合聚类 -
LDA:潜在狄利克雷分配
这些算法覆盖了机器学习中最常见的三大类别,可以处理分类预测、预测建模以及客户分群等任务。
管道模型构建
在PySpark MLlib中使用Pipeline API构建机器学习管道的主要步骤:
-
导入所需算法和Pipeline组件
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
-
构建Pipeline的各个Stage
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
-
按顺序组装Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
-
指定Pipeline的输入输出
pipelineModel = pipeline.fit(trainingData)
prediction = pipelineModel.transform(testingData)
-
对Pipeline进行调优
paramGrid = ParamGridBuilder().addGrid(...).build()
通过这种方式,我们可以方便地构建一个包含数据处理、特征提取、模型训练的端到端机器学习流水线。
Streaming
DStream基本操作
PySpark Streaming中DStream的基本操作:
-
创建DStream
从数据源创建DStream,比如Kafka, socket等
lines = spark.readStream.format("socket").load()
-
转换操作
类似RDD转换,比如map, filter, reduceByKey等
words = lines.flatMap(lambda line: line.split(" "))
-
输出操作
指定DStream输出的结果sink,比如console,文件等
words.writeStream.outputMode("append").format("console").start()
-
触发执行
启动时执行,收到数据时触发,或者通过定时触发
streamingQuery.awaitTermination()
-
查询状态
获取StreamingQuery状态和信息
streamingQuery.status["message"]
这些是DStream的核心操作,可以构建实时流应用。
常见转换
PySpark Streaming 的常见转换操作:
-
map:返回一个新DStream,其中该函数应用于每个元素
words = lines.map(lambda line: line.split(" "))
-
flatMap:类似map,但每个输入项可映射到0个或多个输出项
words = lines.flatMap(lambda line: line.split(" "))
-
filter:返回一个新DStream,其中谓词函数为true的元素保留下来
even_numbers = numbers.filter(lambda x: x % 2 == 0)
-
reduceByKey:在键值对DStream上调用,聚合每个键的值
word_counts = words.reduceByKey(lambda acc, value: acc + value)
-
join:在两个DStream的键上执行内连接操作
stream1.join(stream2)
-
union:将两个或多个DStream合并在一起
stream1.union(stream2)
这些转换操作可以用来实现DStream上各种实时分析业务逻辑。
输出操作
PySpark Streaming中DStream的常见输出操作:
-
print(): 在driver节点上打印DStream中每一批次数据
lines.print()
-
saveAsTextFiles(): 将Text文件写入HDFS目录
words.saveAsTextFiles("output_dir")
-
saveAsObjectFiles(): 将对象序列化后写入文件
parsed.saveAsObjectFiles("output_dir")
-
saveAsHadoopFiles(): 输出格式由文件扩展名决定,写入HDFS
counts.saveAsHadoopFiles("output_dir")
-
foreach(): 对每一批次数据应用函数
words.foreach(send_to_dashboard)
-
foreachPartition(): 对每一个分区应用函数
words.foreachPartition(batch_insert_db)
-
writeStream(): 输出到表或console等外部sink
words.writeStream.format("console").start()
这些输出操作定义了DStream处理结果的存储目的地和计算。
部署运维
Standalone模式
PySpark在Standalone模式下的运行方式:
-
启动Standalone集群
在所有Worker节点上启动Spark Standalone集群
./sbin/start-master.sh
./sbin/start-slave.sh
-
提交PySpark应用
在Driver程序中指定Master URL,然后用spark-submit提交应用
spark = SparkSession.builder.appName("app").master("spark://master:7077").getOrCreate()
# 提交应用
./bin/spark-submit my_script.py
-
并行运行
Spark Standalone集群将Driver和Executor进程分配到Work节点上并行运行
-
检查执行状态
通过http://master:8080观察作业执行情况
-
停止集群
./sbin/stop-master.sh
./sbin/stop-slave.sh
Standalone模式让Spark应用可以不依赖YARN等外部集群管理器,直接在自身管理的资源上运行。
YARN集群
PySpark在YARN集群上的部署运行方式:
-
启动YARN集群
启动YARN的ResourceManager和NodeManager
-
配置Spark on YARN
spark-env.sh中配置Hadoop环境
-
提交PySpark应用
在Driver中指定YARN集群,用spark-submit提交
spark = SparkSession.builder.master("yarn").getOrCreate()
./bin/spark-submit --master yarn my_script.py
-
YARN负责调度
YARN ResourceManager将任务分配给NodeManager执行
-
查看执行状态
通过YARN UI http://resourcemanager:8088观察作业
-
配置历史服务器
在yarn-site.xml中添加历史服务器地址
-
收集日志
从YARN中获取Spark作业的日志
YARN模式利用了YARN的资源管理功能,可以在大规模YARN集群上高效运行Spark应用。
Mesos集群
PySpark在Mesos集群上的部署运行方式:
-
启动Mesos集群
包括Mesos Master和Slave节点
-
安装Spark Mesos包
在所有节点安装spark-mesos包
-
启动Driver
Driver需要初始化Mesos依赖
from pyspark import MesosExecutorBackend
spark = SparkSession.builder.appName("app").mesosExecutorBackend("MesosBackend").getOrCreate()
-
提交PySpark应用
./bin/spark-submit
--master mesos://master:5050
--deploy-mode cluster
--conf spark.mesos.coarse=true
--conf spark.cores.max=5
my_script.py
-
Mesos Master进行资源分配
将Spark作业分配给Mesos Slave执行
-
查看执行状态
通过Mesos UI http://master:5050观察Spark作业
-
收集日志
从Mesos日志中收集应用日志
Mesos模式可以让Spark应用和其他基于Mesos的应用共享资源池,实现更高效的资源利用。
Kubernetes集群
PySpark在Kubernetes集群部署运行方式:
-
准备Kubernetes集群
可以使用Minikube或AWS EKS等快速搭建
-
构建Docker镜像
包含PySpark及依赖
-
准备Python脚本
编写PySpark应用程序脚本,例如app.py
-
提交Spark应用
使用spark-submit,指明Kubernetes集群
./bin/spark-submit
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port>
--deploy-mode cluster
--name spark-app
--conf spark.kubernetes.container.image=<spark-image>
--conf spark.kubernetes.python.mainAppFile=app.py
local:///path/to/app.py
-
Driver Pod启动应用
Kubernetes会启动Executor Pods来运行任务
-
查看应用状态
通过kubectl查看Pod状态和日志
-
访问Dashboard:查看应用运行情况
PySpark 常见问题及解决方案
PySpark使用中常见的问题及解决方案:
1. OutOfMemoryError
-
原因:executor或driver内存不足 -
解决:增加executor/driver内存
2. 任务执行缓慢
-
原因:并行任务数过低,数据倾斜 -
解决:增加 parallelism 参数,重新分区
3. 读取文件失败
-
原因:文件路径错误,权限不足 -
解决:检查文件路径,修改文件权限
4. 输出文件为空
-
原因:输出模式错误,如overwrite -
解决:设置正确的输出模式,如append
5. Serialization错误
-
原因:Java和Python对象序列化错误 -
解决:增加Kryo序列化,调整相关配置
6. 部署失败
-
原因:依赖版本冲突,模式配置错误 -
解决:统一依赖版本,检查部署模式配置
7. 连接超时失败
-
原因:网络不稳定,长时间等待 -
解决:优化网络,增加超时时间
https://github.com/huanglaoxie0503/pyspark-basic-introduction
发表评论