PySpark 的 DataFrame 与 Pandas 的 DataFrame 的比较
PySpark 和 Pandas 中都有 DataFrame 这个数据结构,但是他们的使用方法大有不同。
Reference:pyspark 系列 --pandas 与 pyspark 对比 ;Pandas 和 PySpark 中的 DataFrame 比较 ;PySpark API;Pandas API
工作方式
PySpark
分布式并行计算框架,内建并行机制 parallelism,所有的数据和操作自动并行分布在各个集群结点上。以处理 in-memory 数据的方式处理 distributed 数据。支持 Hadoop,能处理大量数据
import pyspark.sql.functions as F
导入内置函数库Pandas
单机 single machine tool,没有并行机制 parallelism,不支持 Hadoop,处理大量数据有瓶颈
延迟机制
PySpark
lazy-evaluatedPandas
not lazy-evaluated
注:在程式语言理论中,惰性求值(英语:Lazy Evaluation),又译为惰性计算、懒惰求值,也称为传需求调用(call-by-need),是一个计算机编程中的一个概念,目的是要最小化计算机要做的工作。它有两个相关而又有区别的含意,可以表示为 “延迟求值” 和 “最小化求值”。在使用延迟求值的时候,表达式不在它被绑定到变量之后就立即求值,而是在该值被取用的时候求值
内存缓存
PySpark
persist () 或 cache () 将转换的 RDDs 保存在内存Pandas
单机缓存
DataFrame 可变性
PySpark
Spark 中 RDDs 是不可变的,因此 DataFrame 也是不可变的Pandas
Pandas 中 DataFrame 是可变的
创建
- PySpark
直接创建:spark_df = sc.parallelize([(1, 2), (3, 4)]).toDF(['xx', 'yy']
从 pandas_df 转换:spark_df = SQLContext.createDataFrame(pandas_df)
另外,createDataFrame 支持从 list 转换 spark_df,其中 list 元素可以为 tuple,dict,rdd
读取 CSV 文件:spark_df = spark.read.csv(csv_path, header=True)
如果 CSV 文件有 header,则将其读取为列名
读取 parquet 文件:spark_df = spark.read.parquet(parquet_path)
读取 json 文件:spark_df = spark.read.json(json_path)
读取 txt 文件:spark_df = sc.textFile(txt_path).toDF()
注:这些 path 均为 HDFS 路径 - Pandas
直接创建:pandas_df = pd.DataFrame({'xx': {0: 1, 1: 3}, 'yy': {0: 2, 1: 4}})
从 spark_df 转换:pandas_df = spark_df.toPandas()
,或读取其他数据
读取 CSV 文件:pd.read_csv(csv_path)
读取 parquet 文件:pd.read_parquet(parquet_path)
,其中如果 parquet_path 如果是 HDFS 路径则需要加前缀 'hdfs://',比如:'hdfs:///projects/path/to/parquet/'
两者互相转换
PySpark
pandas_df = spark_df.toPandas()
ArrayType()
,StructType()
,MapType()
类型需要提前转换成 string,pandas 不支持Pandas
spark_df = spark.createDataFrame(pandas_df)
转换过程中可能会遇到报错: TypeError: field xx: Can not merge type A and B
原因是该列存在空值。解决方法是转换成 String
pandas_df.xx = pandas_df.xx.astype(str)
还需要保证程序运行的 python 版本和 spark driver 的版本一致,即1
2
3import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable否则会报错:Exception: Python in worker has different version x.y than that in driver m.n, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
写入
PySpark
1
df.repartition(partition_num).write.mode('overwrite'/'append').partitionBy(col_name).parquet(parquet_path)
Pandas
1
2
3
4
5
6
7
8
9
10df.to_csv(csv_path, index=True/False) # 是否保留index
df.to_pickle(csv_pickle)
df.to_parquet(parquet_path)
df.to_json(json_path, orient='split'/'records'/'index'/'columns'/'values'/'table')
# ‘split’ : dict like {‘index’ -> [index], ‘columns’ -> [columns], ‘data’ -> [values]}
# ‘records’ : list like [{column -> value}, … , {column -> value}]
# ‘index’ : dict like {index -> {column -> value}}
# ‘columns’ : dict like {column -> {index -> value}}
# ‘values’ : just the values array
# ‘table’ : dict like {‘schema’: {schema}, ‘data’: {data}}
index 索引
PySpark
没有 index 索引,若需要则要额外创建该列
1
2df.withColumn('index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
df.rdd.zipWithIndex().toDF().select(F.col('_1').getItem('col_name_1').alias('col_name_1'), F.col('_2').getItem('col_name_2').alias('col_name_2'), ..., F.col('_n').getItem('col_name_n').alias('col_name_n'), F.col('_(n+1)').alias('row_index')) # 此方法更快Pandas
自动创建
注:当将 pandas_df 转换为 spark_df 时如需保留索引,则可用
1
spark_df = SQLContext.createDataFrame(pandas_df.reset_index())
行结构
PySpark
Row 结构,属于 Spark DataFrame 结构Pandas
Series 结构,属于 Pandas DataFrame 结构
列结构
PySpark
Column 结构,属于 Spark DataFrame 结构,如:DataFrame[name: string]
Pandas
Series 结构,属于 Pandas DataFrame 结构
列名称
- PySpark
允许重名,修改列名采用 alias 方法
修改列名:df.withColumnRenamed('old_name', 'new_name')
df.select(F.col('old_name').alias('new_name'), ...)
df.selectExpr('old_name as new_name', ...)
- Pandas
不允许重名
修改列名:df.rename(columns={'old_name': 'new_name'})
列修改
- PySpark
原来有df['xx']
列,df.withColumn('xx', 1)
如需判断逻辑:df.withColumn('xx', F.when(condition expression, true expression).otherwise(false expression))
如需链接:df.withColumn('xx', F.concat(F.col('yy'), F.lit('-'), F.col('zz')))
其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。
从文件路径取值:df.withColumn('xx', F.input_file_name().substr(start_index, length))
- Pandas
原来有df['xx']
列,df['xx'] = 1
如需判断逻辑:df.loc[condition expression, 'xx'] = true expression
df.loc[~condition expression, 'xx'] = false expression
df['xx'] = np.where(condition expression, true expression, false expression)
df['xx'] = df.apply(lambda x: true expression if condition expression else false expression, axis=1)
如需链接:df['xx'] = df.yy + '-' + df.zz
其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。
显示
- PySpark
df 不输出具体内容,输出具体内容用 show 方法。df.show(5, truncate=100)
默认显示 20 行,每行显示长度通过 truncate 参数设置
以树的形式打印概要:df.printSchema()
df.columns
输出列的名字 - Pandas
df 输出具体内容
df.columns
输出列的名字
pd.set_option('display.max_columns', None) # 显示所有列
pd.set_option('max_colwidth', 100) # 每行显示长度设置
pd.set_option('display.max_rows', None) # 显示所有行
排序
PySpark
df.sort(df.xx.asc(),df.yy.desc())
df.sort(F.asc('xx'),F.desc('yy'))
df.sort(F.col("xx").asc(), F.col("yy").desc())
df.orderBy(F.col("xx").asc(), F.col("yy").desc())
在列中按值依次进行排序,指定先升序后降序Pandas
df.sort_index()
按轴进行升序排序
df.sort_values(['xx', 'yy'], ascending=[True, False])
在列中按值依次进行排序,指定先升序后降序
df.sort_values(['xx', 'yy'], axis=0)
,df.sort_values([1, 2], axis=1)
在列、行中按值进行升序排序
选择或切片
PySpark
df.select('xx', 'yy')
选择一列或多列
df.first()
以行的形式返回第一行。(注:行的形式为[Row(col_name1=value1, col_name2=value2, ...)]
)
df.head(n)
,df.take(n)
以行的形式返回前 n 行;df.tail(n)
以行的形式返回最后 n 行
用df.collect()
以行的形式返回所有行Pandas
df.xx
,df['xx']
选择列名为 xx 的列,df [k] 选择行名为 k 的行
df.iat[:, k]
,df.iloc[:, k]
选择第 k 列,df.iat[k]
,df.iloc[k]
选择第 k 行
过滤
PySpark
df.filter(df['xx'] > k)
或者df.where(df['xx'] > k)
取值存在于:
df.filter(F.col('xx').isin(filter_list))
空值处理:
- 值为 null:
df.filter(F.col('xx').isNull())
和df.filter(F.col('xx').isNotNull())
- 值为空字符串:
df.filter(F.col('xx') == '')
- 值为 np.nan:
df.filter(F.col('xx') == np.nan)
- 值为 null:
Pandas
df[df['xx']>k]
或者s[s>k]
取值存在于:
df[df.xx.isin(filter_list)]
空值处理:包括 null,np.NaN,pd.NaT,None,不包括空字符串
df[df.xx.isnull()]
或df[df.xx.isna()]
和df.filter(F.col('xx').notnull())
或df[df.xx.notnull()]
或df[df.xx.notna()]
Examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15>> pdf = pd.DataFrame(dict(numpy=[np.NaN], pandas=[pd.NaT], empty=[''], none=[None]))
>> pdf
numpy pandas empty none
0 NaN NaT None
>> pdf.isnull()
numpy pandas empty none
0 True True False True
>> from pyspark.sql.types import *
>> sdf = spark.createDataFrame(pdf, StructType([StructField('numpy', DoubleType(), True),StructField('pandas', StringType(), True),StructField('empty', StringType(), True),StructField('pandas', StringType(), True)])) # 必须指定schema,否则报错”ValueError: Some of types cannot be determined after inferring“
>> sdf.show()
+-----+------+-----+------+
|numpy|pandas|empty|pandas|
+-----+------+-----+------+
| NaN| null| | null|
+-----+------+-----+------+
分组聚合
PySpark
df.groupBy(cols_to_group)
或者df.groupBy(cols_to_group).avg('xx').show()
应用单个函数
df.groupBy(cols_to_group).agg(F.avg('xx'), F.min('xx'), F.max('xx')).show()
应用多个函数Pandas
df.groupby(cols_to_group)
df.groupby(cols_to_group).avg('xx')
group filter by function:
df.groupby(cols_to_group).filter(function)
统计
PySpark
df.count()
输出总行数
df.describe()
描述某些列的 count, mean, stddev, min, maxPandas
df.count()
输出每一列的非空行数
df.shape
输出行数 x 列数
df.describe()
描述某些列的 count, mean, std, min, 25%, 50%, 75%, max
合并
PySpark
扩充列
df.join()
同名列不自动添加后缀,只有键值完全匹配才保留一份副本
'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross'.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161>> df_left = spark.createDataFrame(pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4]))
>> df_left.show()
+----+-----------+
|Keys|Values_Left|
+----+-----------+
|key1| value1|
|key2| value2|
|key3| value3|
|key4| value4|
|key5| value5|
+----+-----------+
>> df_right = spark.createDataFrame(pd.DataFrame({'Keys': ['key1', 'key2', 'key2', 'key3', 'key6'], 'Values_Right': ['value6', 'value7', 'value8', 'value9', 'value10']}, index=[0, 1, 2, 3, 4]))
>> df_right.show()
+----+------------+
|Keys|Values_Right|
+----+------------+
|key1| value6|
|key2| value7|
|key2| value8|
|key3| value9|
|key6| value10|
+----+------------+
>> df_left.join(df_right, ['Keys'], 'left').sort('Keys').show() # left=leftouter=left_outer
+----+-----------+------------+
|Keys|Values_Left|Values_Right|
+----+-----------+------------+
|key1| value1| value6|
|key2| value2| value7|
|key2| value2| value8|
|key3| value3| value9|
|key4| value4| null|
|key5| value5| null|
+----+-----------+------------+
>> df_right.join(df_left, ['Keys'], 'right').sort('Keys').show() # right=rightouter=right_outer
+----+------------+-----------+
|Keys|Values_Right|Values_Left|
+----+------------+-----------+
|key1| value6| value1|
|key2| value7| value2|
|key2| value8| value2|
|key3| value9| value3|
|key4| null| value4|
|key5| null| value5|
+----+------------+-----------+
>> df_left.join(df_right, ['Keys'], 'left_semi').sort('Keys').show() # left_semi=leftsemi
+----+-----------+
|Keys|Values_Left|
+----+-----------+
|key1| value1|
|key2| value2|
|key3| value3|
+----+-----------+
>> df_left.join(df_right, ['Keys'], 'left_anti').sort('Keys').show() # leftanti=left_anti
+----+-----------+
|Keys|Values_Left|
+----+-----------+
|key4| value4|
|key5| value5|
+----+-----------+
>> df_left.join(df_right, ['Keys'], 'inner').sort('Keys').show()
+----+-----------+------------+
|Keys|Values_Left|Values_Right|
+----+-----------+------------+
|key1| value1| value6|
|key2| value2| value8|
|key2| value2| value7|
|key3| value3| value9|
+----+-----------+------------+
>> df_left.join(df_right, ['Keys'], 'outer').sort('Keys').show() # outer=full=fullouter=full_outer
+----+-----------+------------+
|Keys|Values_Left|Values_Right|
+----+-----------+------------+
|key1| value1| value6|
|key2| value2| value8|
|key2| value2| value7|
|key3| value3| value9|
|key4| value4| null|
|key5| value5| null|
|key6| null| value10|
+----+-----------+------------+
>> df_left = df_left.withColumnRenamed('Keys', 'Keys_Left')
>> df_right = df_right.withColumnRenamed('Keys', 'Keys_Right')
>> df_left.join(df_right, df_left.Keys_Left==df_right.Keys_Right, 'cross').show()
+---------+-----------+----------+------------+
|Keys_Left|Values_Left|Keys_Right|Values_Right|
+---------+-----------+----------+------------+
| key1| value1| key1| value6|
| key2| value2| key2| value8|
| key2| value2| key2| value7|
| key3| value3| key3| value9|
+---------+-----------+----------+------------+
>> df_left.crossJoin(df_right).show(25)
+---------+-----------+----------+------------+
|Keys_Left|Values_Left|Keys_Right|Values_Right|
+---------+-----------+----------+------------+
| key1| value1| key1| value6|
| key1| value1| key2| value7|
| key1| value1| key2| value8|
| key1| value1| key3| value9|
| key1| value1| key6| value10|
| key2| value2| key1| value6|
| key2| value2| key2| value7|
| key2| value2| key2| value8|
| key2| value2| key3| value9|
| key2| value2| key6| value10|
| key3| value3| key1| value6|
| key3| value3| key2| value7|
| key3| value3| key2| value8|
| key3| value3| key3| value9|
| key3| value3| key6| value10|
| key4| value4| key1| value6|
| key4| value4| key2| value7|
| key4| value4| key2| value8|
| key4| value4| key3| value9|
| key4| value4| key6| value10|
| key5| value5| key1| value6|
| key5| value5| key2| value7|
| key5| value5| key2| value8|
| key5| value5| key3| value9|
| key5| value5| key6| value10|
+---------+-----------+----------+------------+
>> from pyspark.sql.window import Window
>> df1 = spark.createDataFrame(pd.DataFrame({'Keys': [1, 5, 10], 'Values_Left': [1, 5, 10]}, index=[0, 1, 2]))
>> df2 = spark.createDataFrame(pd.DataFrame({'Keys': [1, 2, 3, 6, 7], 'Values_Right': [1, 2, 3, 6, 7]}, index=[0, 1, 2, 4, 5]))
>> df1 = df1.withColumn('Values_Right', F.lit(None))
>> df2 = df2.withColumn('Values_Left', F.lit(None))
>> df3 = df1.unionByName(df2)
>> w = Window.orderBy('Keys_On', 'Values_Left').rowsBetween(Window.unboundedPreceding, -1)
>> df3.withColumn('Values_Right', F.last('Values_Right', True).over(w)).filter(~F.isnull('Values_Left')).show()
+-------+-------+-----------+------------+
|Keys_On|Keys_By|Values_Left|Values_Right|
+-------+-------+-----------+------------+
| 1| a| 1| 1|
| 5| a| 5| 3|
| 10| b| 10| 7|
+-------+-------+-----------+------------+
>> def asof_join(l, r):
>> return pd.merge_asof(l, r, on='Keys_On', by='Keys_By')
>> df1 = spark.createDataFrame(df_left)
>> df2 = spark.createDataFrame(df_right)
>> df1.groupby('Keys_By').cogroup(df2.groupby('Keys_By')).applyInPandas(
>> asof_join, schema="Keys_On int, Keys_By string, Values_Left float, Values_Right float"
>> ).show()
+-------+-------+-----------+------------+
|Keys_On|Keys_By|Values_Left|Values_Right|
+-------+-------+-----------+------------+
| 10| b| 10.0| 7.0|
| 1| a| 1.0| 1.0|
| 5| a| 5.0| 3.0|
+-------+-------+-----------+------------+扩充行
df.union()
:两个 df 合并,按位置进行合并,列名以前表为准(a.union (b) 列名顺序以 a 为准)
df.unoinAll()
:同 union 方法
df.unionByName()
:两个 df 合并,按列名进行合并
df1.unionByName(df2).unionByName(df3)
Pandas
Pandas 下有concat
方法,支持轴向合并
pd.concat([df1, df2, df3], ignore_index=True, sort=False)
df1.append([df2, df3], ignore_index=True, sort=False)
df1.join([df2, df3])
Pandas 下有merge
方法,支持多列合并
同名列自动添加后缀,对应键仅保留一份副本
df.join()
支持多列合并
df.append()
支持多行合并1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248>> df_left = pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4])
Keys Values_Left
0 key1 value1
1 key2 value2
2 key3 value3
3 key4 value4
4 key5 value5
>> df_right = pd.DataFrame({'Keys': ['key1', 'key2', 'key2', 'key3', 'key6'], 'Values_Right': ['value6', 'value7', 'value8', 'value9', 'value10']}, index=[0, 1, 2, 4, 5])
Keys Values_Right
0 key1 value6
1 key2 value7
2 key2 value8
4 key3 value9
5 key6 value10
>> df_left.append(df_right) <=> pd.concat([df_left, df_right])
Keys Values_Left Values_Right
0 key1 value1 NaN
1 key2 value2 NaN
2 key3 value3 NaN
3 key4 value4 NaN
4 key5 value5 NaN
0 key1 NaN value6
1 key2 NaN value7
2 key2 NaN value8
4 key3 NaN value9
5 key6 NaN value10
>> pd.concat([df_left, df_right], axis=1) # 类似join outer,key为index
Keys Values_Left Keys Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
3 key4 value4 NaN NaN
4 key5 value5 key3 value9
5 NaN NaN key6 value10
>> pd.concat([df_left, df_right], axis=1, join='inner')
Keys Values_Left Keys Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
4 key5 value5 key3 value9
>> pd.merge(df_left, df_right, on='Keys')
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key3 value3 value9
>> pd.merge(df_left, df_right, how='left', on='Keys')
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key3 value3 value9
4 key4 value4 NaN
5 key5 value5 NaN
>> pd.merge(df_left, df_right, how='right', on='Keys')
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key3 value3 value9
4 key6 NaN value10
>> pd.merge(df_left, df_right, how='outer', on='Keys', validate='one_to_many', indicator='indicator_column')
Keys Values_Left Values_Right indicator_column
0 key1 value1 value6 both
1 key2 value2 value7 both
2 key2 value2 value8 both
3 key3 value3 value9 both
4 key4 value4 NaN left_only
5 key5 value5 NaN left_only
6 key6 NaN value10 right_only
>> pd.merge(df_left, df_right, how='inner', on='Keys')
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key3 value3 value9
>> df_left.join(df_right, how='left', lsuffix='_Left', rsuffix='_Right')
Keys_Left Values_Left Keys_Right Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
3 key4 value4 NaN NaN
4 key5 value5 key3 value9
>> df_left.join(df_right, how='right', lsuffix='_Left', rsuffix='_Right')
Keys_Left Values_Left Keys_Right Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
4 key5 value5 key3 value9
5 NaN NaN key6 value10
>> df_left.join(df_right, how='outer', lsuffix='_Left', rsuffix='_Right')
Keys_Left Values_Left Keys_Right Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
3 key4 value4 NaN NaN
4 key5 value5 key3 value9
5 NaN NaN key6 value10
>> df_left.join(df_right, how='inner', lsuffix='_Left', rsuffix='_Right')
Keys_Left Values_Left Keys_Right Values_Right
0 key1 value1 key1 value6
1 key2 value2 key2 value7
2 key3 value3 key2 value8
4 key5 value5 key3 value9
>> df_left.join(df_right.reindex(['key1', 'key2', 'key2', 'key3', 'key6']), on='Keys', lsuffix='_Left', rsuffix='_Right') <=> pd.merge(df_left, df_right.reindex(['key1', 'key2', 'key2', 'key3', 'key6']), left_on="Keys", right_index=True, how="left", sort=False, suffixes=('_Left', '_Right'))
Keys Keys_Left Values_Left Keys_Right Values_Right
0 key1 key1 value1 NaN NaN
1 key2 key2 value2 NaN NaN
1 key2 key2 value2 NaN NaN
2 key3 key3 value3 NaN NaN
3 key4 key4 value4 NaN NaN
4 key5 key5 value5 NaN NaN
>> df_left.combine_first(df_right) # 相同Keys只保留左边的
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key3 value3 value8
3 key4 value4 NaN
4 key5 value5 value9
5 key6 NaN value10
>> df_left.update(df_right)
>> df_left
Keys Values_Left
0 key1 value1
1 key2 value2
2 key2 value3
3 key4 value4
4 key3 value5
>> pd.merge_ordered(df_left, df_right, on="Keys")
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key2 value3 value7
4 key2 value3 value8
5 key3 value5 value9
6 key4 value4 NaN
7 key6 NaN value10
>> pd.merge_ordered(df_left, df_right, left_by="Keys")
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key2 value3 value7
4 key2 value3 value8
5 key4 value4 NaN
6 key3 value5 value9
>> pd.merge_ordered(df_left, df_right, right_by="Keys")
Keys Values_Left Values_Right
0 key1 value1 value6
1 key2 value2 value7
2 key2 value2 value8
3 key2 value3 value7
4 key2 value3 value8
5 key3 value5 value9
6 key6 NaN value10
>> df_left = df_left.rename(columns={'Values_Left': 'Values'})
>> df_right = df_right.rename(columns={'Values_Right': 'Values'}).reset_index(drop=True)
>> df_left.compare(df_right, align_axis=0)
Keys Values
0 self NaN value1
other NaN value6
1 self NaN value2
other NaN value7
2 self NaN value3
other NaN value8
3 self key4 value4
other key3 value9
4 self key3 value5
other key6 value10
>> df_left.compare(df_right, keep_shape=True)
Keys Values
self other self other
0 NaN NaN value1 value6
1 NaN NaN value2 value7
2 NaN NaN value3 value8
3 key4 key3 value4 value9
4 key3 key6 value5 value10
>> df_left.compare(df_right, keep_shape=True, keep_equal=True)
Keys Values
self other self other
0 key1 key1 value1 value6
1 key2 key2 value2 value7
2 key2 key2 value3 value8
3 key4 key3 value4 value9
4 key3 key6 value5 value10
>> df_left = pd.DataFrame({'Keys_On': [1, 5, 10], 'Keys_By': ['a', 'a', 'b'], 'Values_Left': [1, 5, 10]}, index=[0, 1, 2])
Keys_On Keys_By Values_Left
0 1 a 1
1 5 a 5
2 10 b 10
>> df_right = pd.DataFrame({'Keys_On': [1, 2, 3, 6, 7], 'Keys_By': ['a', 'a', 'a', 'b', 'b'], 'Values_Right': [1, 2, 3, 6, 7]}, index=[0, 1, 2, 4, 5])
Keys_On Keys_By Values_Right
0 1 a 1
1 2 a 2
2 3 a 3
4 6 b 6
5 7 b 7
>> pd.merge_asof(df_left, df_right, on='Keys_On')
Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
0 1 a 1 a 1
1 5 a 5 a 3
2 10 b 10 b 7
>> pd.merge_asof(df_left, df_right, on='Keys_On', allow_exact_matches=False)
Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
0 1 a 1 NaN NaN
1 5 a 5 a 3.0
2 10 b 10 b 7.0
>> pd.merge_asof(df_left, df_right, on='Keys_On', direction='forward')
Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
0 1 a 1 a 1.0
1 5 a 5 b 6.0
2 10 b 10 NaN NaN
>> pd.merge_asof(df_left, df_right, on='Keys_On', direction='nearest')
Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
0 1 a 1 a 1
1 5 a 5 b 6
2 10 b 10 b 7
>> pd.merge_asof(df_left, df_right, on='Keys_On', tolerance=2)
Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
0 1 a 1 a 1.0
1 5 a 5 a 3.0
2 10 b 10 NaN NaN
>> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By')
Keys_On Keys_By Values_Left Values_Right
0 1 a 1 1
1 5 a 5 3
2 10 b 10 7
>> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', allow_exact_matches=False)
Keys_On Keys_By Values_Left Values_Right
0 1 a 1 NaN
1 5 a 5 3.0
2 10 b 10 7.0
>> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', direction='forward')
Keys_On Keys_By Values_Left Values_Right
0 1 a 1 1.0
1 5 a 5 NaN
2 10 b 10 NaN
>> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', direction='nearest')
Keys_On Keys_By Values_Left Values_Right
0 1 a 1 1
1 5 a 5 3
2 10 b 10 7
>> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', tolerance=2)
Keys_On Keys_By Values_Left Values_Right
0 1 a 1 1.0
1 5 a 5 3.0
2 10 b 10 NaN
删除
PySpark
删除一列:df.drop('xx')
或者df.drop(F.col('xx'))
删除多列:df.drop(*['xx', 'yy', ...])
删除某(些)行:使用 filter 方法
去重:df.dropDuplicates()
或df.drop_duplicates()
其中参数必须为数组,df.distinct()
其中不能传入参数。
在 cols_to_group 相同的情况下保留 xx 列的值最小 / 最大的行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33df.groupBy(cols_to_group).agg(F.min/max('xx').alias('xx'))
df.withColumn('min/max', F.min/max('xx').over(Window.partitionBy(cols_to_group))).where(F.col('xx') == F.col('min/max'))
# example
>> pdf = pd.DataFrame(dict(os=['ANDROID', 'ANDROID', 'IOS', 'IOS', 'IOS', 'IOS'], region=['SG', 'SG', 'SG', 'SG', 'CN', 'CN'], value=[1,2,3,4,5,6]))
>> sdf = spark.createDataFrame(pdf)
>> sdf.show()
+-------+------+-----+
| os|region|value|
+-------+------+-----+
|ANDROID| SG| 1|
|ANDROID| SG| 2|
| IOS| SG| 3|
| IOS| SG| 4|
| IOS| CN| 5|
| IOS| CN| 6|
+-------+------+-----+
>> sdf.groupBy(['os', 'region']).agg(F.min('value')).show()
+-------+------+----------+
| os|region|min(value)|
+-------+------+----------+
| IOS| CN| 5|
| IOS| SG| 3|
|ANDROID| SG| 1|
+-------+------+----------+
>> from pyspark.sql.window import Window
>> sdf.withColumn('min', F.min('value').over(Window.partitionBy(['os', 'region']))).where(F.col('value') == F.col('min')).show()
+-------+------+-----+---+
| os|region|value|min|
+-------+------+-----+---+
| IOS| CN| 5| 5|
| IOS| SG| 3| 3|
|ANDROID| SG| 1| 1|
+-------+------+-----+---+Pandas
删除某(些)列:df.drop(['xx', 'xx'], axis=1)
或者df.drop(columns=['xx', 'yy'])
删除某(些)行:df.drop([0, 1])
其中 0,1 为 index 名字
去重:drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
其中 keep 取值 {‘first’, ‘last’, False};keep 第一次或者最后一次出现。如果需要根据某列最大最小值保留的话,则需提前排序
更改数据类型
PySpark
更改指定列的数据类型:df = df.withColumn('xx', F.col('xx').cast(Type()))
Pandas
更改所有列的数据类型:df.astype('type')
更改指定列的数据类型:df.astype({'xx': 'int32'})
失数据处理
PySpark
不自动添加 NaNs,且不抛出错误
fillna 函数:df.na.fill()
dropna 函数:df.na.drop()
Pandas
对缺失数据自动添加 NaNs
fillna 函数:df.fillna()
dropna 函数:df.dropna()
SQL 语句
PySpark
表格注册:把 DataFrame 结构注册成 SQL 语句使用类型
df.registerTempTable('tt')
或者sqlContext.registerDataFrameAsTable(df, 'tt')
spark.sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')
功能注册:把函数注册成 SQL 语句使用类型
spark.registerFunction('stringLengthString', lambda x: len(x))
spark.sql("SELECT stringLengthString('test')")
Pandas
import sqlite3
pd.read_sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')
函数应用
PySpark
df.foreach(f)
或者df.rdd.foreach(f)
将 df 的每一列应用函数 f
df.foreachPartition(f)
或者df.rdd.foreachPartition(f)
将 df 的每一块应用函数 f
UDF (User-defined Function):1
2
3
4
5
6
7
8
9
10# one-line way:
udf_name = F.udf(lambda x, y: expression, ReturnType())
# def way:
def udf_name(x):
expression
return
df.withColumn('xx', udf_name(F.col('xx'), F.col('yy')))Pandas
df.apply(f)
将 df 的每一列应用函数 fPandas udf in PySpark
Driver 可能缺少必要的 package:pyarrow 导致报错 ModuleNotFoundError: No module named 'pyarrow'。有多种方案解决:参考 Python Package Management
PySpark 允许通过以下方式将 Python 文件 (.py
)、压缩的 Python 包 (.zip
) 和 Egg 文件 (.egg
) 上传到执行程序:
设置配置spark.submit.pyFiles
或者--py-files
Spark 脚本中的设置选项或者直接调用pyspark.SparkContext.addPyFile()
应用程序。
这是将额外的自定义 Python 代码发送到集群的直接方法。只添加单个文件或压缩整个包并上传它们。如果使用pyspark.SparkContext.addPyFile()
,即使 job 开始运行后也允许使用上传的代码。
但是不允许添加构建为 Wheels 包,因此不允许包含与本机代码的依赖关系。使用 Conda 打包
Conda 是使用最广泛的 Python 包管理系统之一。PySpark 可以直接使用 Conda 环境通过利用 conda-pack 来传送第三方 Python 包,它是一个命令行工具,可创建可重定位的 Conda 环境。
下面的示例创建了一个 Conda 环境以在驱动程序和执行程序上使用,并将其打包到一个存档文件中。此存档文件捕获 Python 的 Conda 环境并存储 Python 解释器及其所有相关依赖项。
1
2
3conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
之后可以使用--archives
选项或spark.archives
配置(spark.yarn.dist.archives
在 YARN 中)将其与脚本或代码一起发送。它会自动解压缩执行程序上的存档。
在spark-submit
脚本的情况下,您可以按如下方式使用它:1
2
3export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py注意
PYSPARK_DRIVER_PYTHON
不应为 YARN 集群模式设置上述内容。
如果您使用的是常规 Python shell 或 Notebook,您可以尝试如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = './environment/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = './environment/bin/python'
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)
# 或者
SPARK_CONF = SparkConf() \
.set('spark.yarn.dist.archives', 'pyspark_conda_env.tar.gz#environment') \
.set('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1') \
.set('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
sc = SparkContext(appName=appName, conf=SPARK_CONF)
sc.setLogLevel('ERROR')
spark = SparkSession.builder.enableHiveSupport().getOrCreate()对于 pyspark Shell:
1
2
3export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment使用本机,不用集群
1
2
3
4
5
6SPARK_CONF = SparkConf() \
.setMaster('local') \
.set('spark.submit.deployMode', 'client')
sc = SparkContext(appName=appName, conf=SPARK_CONF)
sc.setLogLevel('ERROR')
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
Map-Reduce 操作
PySpark
df.map(func)
,df.reduce(func)
返回类型 seqRDDsPandas
map-reduce操作map(func, list)
,reduce(func, list)
返回类型 seq
Diff 操作
PySpark
没有 diff 操作(Spark 的上下行是相互独立,分布式存储的)Pandas
有 diff 操作,处理时间序列数据(Pandas 会对比当前行与上一行)
Most Common 计数
PySpark
1
2
3
4df.cube(column_name/column_list).count().sort('count', ascending=False)
df.groupBy(column_name/column_list).agg({'*': 'count'}).withColumnRenamed('count(1)', 'new_count')
df.groupBy(column_name/column_list).agg(F.count('*').alias('new_count'))
df.groupBy(column_name/column_list).agg(F.countDistinct('*').alias('new_count'))Pandas
df.value_counts(ascending=False)
Json 格式化、选择、解析
PySpark
格式化:df.withColum('json_string', F.to_json(F.struct('key1', 'key2')))
选择:df.select('json_string.key')
解析:json_schema = spark.read.json(df.rdd.map(lambda row: row.json_string)).schema
F.get_json_object('json_string', '$.key')
F.from_json(F.get_json_object('json_string', '$.key'), schema)
Pandas
格式化:df['json_string'] = df[['key1', 'key2']].to_dict(orient='records')
选择:df.json_string.apply(pd.Series).key
解析:
1
df.join(pd.concat(list(df['json_string'].apply(lambda x: pd.json_normalize(json.loads(x)))), ignore_index=True))
Explode 操作、Pivot 操作、Melt 操作
PySpark
Explode <=> Groupby:
将 xx 列中的每行的列表 / 数组值分拆形成单独的行
1
2
3
4df.withColumn('xx', explode(F.col('yy'))) # 忽略空值或者空列表/数组
df.withColumn('xx', explode_outer(F.col('yy'))) # 不忽略空值或者空列表/数组
df.groupBy(cols_to_group).agg(F.collect_list('xx').alias('yy')) # 返回list形式,不去重
df.groupBy(cols_to_group).agg(F.collect_set('xx').alias('yy')) # 返回set形式,去重Pivot <=> Melt
Label Keys Values 0 'key1' 'value1' 0 'key2' 'value2' 1 'key1' 'value3' 1 'key2' 'value4' <=>
Label key1 key2 0 'value1' 'value2' 1 'value3' 'value4' 1
2
3
4
5
6
7
8def melt(df, id_vars, value_vars, var_name, value_name):
_vars_and_vals = F.create_map(list(chain.from_iterable([[F.lit(c), F.col(c)] for c in value_vars])))
_tmp = df.select(*id_vars, explode(_vars_and_vals)).withColumnRenamed('key', var_name).withColumnRenamed('value', value_name)
return _tmp.select(*cols)
df_melt = df.selectExpr('Label', 'stack({}, {}) as (Keys, Values)'.format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in df.columns[1:])))).where(F.col('Values').isNotNull())
df_melt = melt(df, id_vars='Label', value_vars=df.columns[1:], var_name='Keys', value_name='Values')
df_pivot = df_melt.groupBy('Label').pivot('Keys').agg(F.first('Values'))Pandas
Explode:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28>> df = pd.DataFrame( {'a':['A', 'B', 'C'], 'b':[[1], [2, 3], [4, 5, 4]]})
>> df.explode('b') # explode的列名还是b
a b
0 A 1
1 B 2
1 B 3
2 C 4
2 C 5
2 C 4
>> df = pd.DataFrame( {'a':['A', 'B', 'B', 'C', 'C', 'C'], 'b':[1, 2, 3, 4, 5, 4]})
>> df
a b
0 A 1
1 B 2
2 B 3
3 C 4
4 C 5
5 C 4
>> df.groupby('a')['b'].apply(list).reset_index()
a b
0 A [1]
1 B [2, 3]
2 C [4, 5, 4]
>> df.groupby('a')['b'].apply(set).apply(list).reset_index()
a b
0 A [1]
1 B [2, 3]
2 C [4, 5]Pivot <=> Melt:
Label Keys Values 0 'key1' 'value1' 0 'key2' 'value2' 1 'key1' 'value3' 1 'key2' 'value4' <=>
Values
Keys
key1
key2
Label
0
value1
value2
1
value3
value4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38>> import numpy as np
>> import pandas as pd
>> def melt(frame):
>> # identifier is index
>> N, K = frame.shape
>> data = {
>> "Values": frame.to_numpy().ravel(),
>> "Keys": np.tile(np.asarray(frame.columns), K),
>> "Label": np.asarray(frame.index).repeat(N),
>> }
>> return pd.DataFrame(data, columns=["Label", "Keys", "Values"])
>> df = pd.DataFrame({'key1': ['value1', 'value3'], 'key2': ['value2', 'value4']}, index=[0, 1])
key1 key2
0 value1 value2
1 value3 value4
>> df_melt = melt(df)
Label Keys Values
0 0 key1 value1
1 0 key2 value2
2 1 key1 value3
3 1 key2 value4
>> df = df.reset_index().rename(columns={'index': 'Label'})
Label key1 key2
0 0 value1 value2
1 1 value3 value4
>> df_melt = df.melt(id_vars=['Label'], var_name='Keys', value_name='Values').sort_values('Label').reset_index(drop=True)
Label Keys Values
0 0 key1 value1
1 0 key2 value2
2 1 key1 value3
3 1 key2 value4
>> df_pivot = df_melt.pivot(index="Label", columns="Keys", values="Values")
Keys key1 key2
Label
0 value1 value2
1 value3 value4