PySpark 的 DataFrame 与 Pandas 的 DataFrame 的比较

PySpark 和 Pandas 中都有 DataFrame 这个数据结构,但是他们的使用方法大有不同。
Reference:pyspark 系列 --pandas 与 pyspark 对比 ;Pandas 和 PySpark 中的 DataFrame 比较 ;PySpark API;Pandas API

工作方式

  • PySpark
    分布式并行计算框架,内建并行机制 parallelism,所有的数据和操作自动并行分布在各个集群结点上。以处理 in-memory 数据的方式处理 distributed 数据。支持 Hadoop,能处理大量数据
    import pyspark.sql.functions as F 导入内置函数库

  • Pandas
    单机 single machine tool,没有并行机制 parallelism,不支持 Hadoop,处理大量数据有瓶颈

延迟机制

  • PySpark
    lazy-evaluated

  • Pandas
    not lazy-evaluated

注:在程式语言理论中,惰性求值(英语:Lazy Evaluation),又译为惰性计算懒惰求值,也称为传需求调用(call-by-need),是一个计算机编程中的一个概念,目的是要最小化计算机要做的工作。它有两个相关而又有区别的含意,可以表示为 “延迟求值” 和 “最小化求值”。在使用延迟求值的时候,表达式不在它被绑定到变量之后就立即求值,而是在该值被取用的时候求值

内存缓存

  • PySpark
    persist () 或 cache () 将转换的 RDDs 保存在内存

  • Pandas
    单机缓存

DataFrame 可变性

  • PySpark
    Spark 中 RDDs 是不可变的,因此 DataFrame 也是不可变的

  • Pandas
    Pandas 中 DataFrame 是可变的

创建

  • PySpark
    直接创建: spark_df = sc.parallelize([(1, 2), (3, 4)]).toDF(['xx', 'yy']
    从 pandas_df 转换:spark_df = SQLContext.createDataFrame(pandas_df)
    另外,createDataFrame 支持从 list 转换 spark_df,其中 list 元素可以为 tuple,dict,rdd
    读取 CSV 文件:spark_df = spark.read.csv(csv_path, header=True) 如果 CSV 文件有 header,则将其读取为列名
    读取 parquet 文件:spark_df = spark.read.parquet(parquet_path)
    读取 json 文件:spark_df = spark.read.json(json_path)
    读取 txt 文件:spark_df = sc.textFile(txt_path).toDF()
    注:这些 path 均为 HDFS 路径
  • Pandas
    直接创建: pandas_df = pd.DataFrame({'xx': {0: 1, 1: 3}, 'yy': {0: 2, 1: 4}})
    从 spark_df 转换:pandas_df = spark_df.toPandas(),或读取其他数据
    读取 CSV 文件:pd.read_csv(csv_path)
    读取 parquet 文件:pd.read_parquet(parquet_path),其中如果 parquet_path 如果是 HDFS 路径则需要加前缀 'hdfs://',比如:'hdfs:///projects/path/to/parquet/'

两者互相转换

  • PySpark
    pandas_df = spark_df.toPandas()
    ArrayType(), StructType(), MapType() 类型需要提前转换成 string,pandas 不支持

  • Pandas
    spark_df = spark.createDataFrame(pandas_df)
    转换过程中可能会遇到报错: TypeError: field xx: Can not merge type A and B
    原因是该列存在空值。解决方法是转换成 String
    pandas_df.xx = pandas_df.xx.astype(str)
    还需要保证程序运行的 python 版本和 spark driver 的版本一致,即

    1
    2
    3
    import sys
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

    否则会报错:Exception: Python in worker has different version x.y than that in driver m.n, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

写入

  • PySpark

    1
    df.repartition(partition_num).write.mode('overwrite'/'append').partitionBy(col_name).parquet(parquet_path)

  • Pandas

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    df.to_csv(csv_path, index=True/False) # 是否保留index
    df.to_pickle(csv_pickle)
    df.to_parquet(parquet_path)
    df.to_json(json_path, orient='split'/'records'/'index'/'columns'/'values'/'table')
    # ‘split’ : dict like {‘index’ -> [index], ‘columns’ -> [columns], ‘data’ -> [values]}
    # ‘records’ : list like [{column -> value}, … , {column -> value}]
    # ‘index’ : dict like {index -> {column -> value}}
    # ‘columns’ : dict like {column -> {index -> value}}
    # ‘values’ : just the values array
    # ‘table’ : dict like {‘schema’: {schema}, ‘data’: {data}}

index 索引

  • PySpark
    没有 index 索引,若需要则要额外创建该列

    1
    2
    df.withColumn('index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
    df.rdd.zipWithIndex().toDF().select(F.col('_1').getItem('col_name_1').alias('col_name_1'), F.col('_2').getItem('col_name_2').alias('col_name_2'), ..., F.col('_n').getItem('col_name_n').alias('col_name_n'), F.col('_(n+1)').alias('row_index')) # 此方法更快

  • Pandas
    自动创建
    注:当将 pandas_df 转换为 spark_df 时如需保留索引,则可用

    1
    spark_df = SQLContext.createDataFrame(pandas_df.reset_index())

行结构

  • PySpark
    Row 结构,属于 Spark DataFrame 结构

  • Pandas
    Series 结构,属于 Pandas DataFrame 结构

列结构

  • PySpark
    Column 结构,属于 Spark DataFrame 结构,如:DataFrame[name: string]

  • Pandas
    Series 结构,属于 Pandas DataFrame 结构

列名称

  • PySpark
    允许重名,修改列名采用 alias 方法
    修改列名:df.withColumnRenamed('old_name', 'new_name')
    df.select(F.col('old_name').alias('new_name'), ...)
    df.selectExpr('old_name as new_name', ...)
  • Pandas
    不允许重名
    修改列名:df.rename(columns={'old_name': 'new_name'})

列修改

  • PySpark
    原来有 df['xx'] 列,df.withColumn('xx', 1)
    如需判断逻辑:df.withColumn('xx', F.when(condition expression, true expression).otherwise(false expression))
    如需链接:df.withColumn('xx', F.concat(F.col('yy'), F.lit('-'), F.col('zz'))) 其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。
    从文件路径取值:df.withColumn('xx', F.input_file_name().substr(start_index, length))
  • Pandas
    原来有 df['xx'] 列,df['xx'] = 1
    如需判断逻辑:df.loc[condition expression, 'xx'] = true expression
    df.loc[~condition expression, 'xx'] = false expression
    df['xx'] = np.where(condition expression, true expression, false expression)
    df['xx'] = df.apply(lambda x: true expression if condition expression else false expression, axis=1)
    如需链接:df['xx'] = df.yy + '-' + df.zz 其中 yy 和 zz 列须为 string 类型,如不是则需要提前类型转换。

显示

  • PySpark
    df 不输出具体内容,输出具体内容用 show 方法。df.show(5, truncate=100) 默认显示 20 行,每行显示长度通过 truncate 参数设置
    以树的形式打印概要:df.printSchema()
    df.columns 输出列的名字
  • Pandas
    df 输出具体内容
    df.columns 输出列的名字
    pd.set_option('display.max_columns', None) # 显示所有列
    pd.set_option('max_colwidth', 100) # 每行显示长度设置
    pd.set_option('display.max_rows', None) # 显示所有行

排序

  • PySpark
    df.sort(df.xx.asc(),df.yy.desc())
    df.sort(F.asc('xx'),F.desc('yy'))
    df.sort(F.col("xx").asc(), F.col("yy").desc())
    df.orderBy(F.col("xx").asc(), F.col("yy").desc())
    在列中按值依次进行排序,指定先升序后降序

  • Pandas
    df.sort_index() 按轴进行升序排序
    df.sort_values(['xx', 'yy'], ascending=[True, False]) 在列中按值依次进行排序,指定先升序后降序
    df.sort_values(['xx', 'yy'], axis=0)df.sort_values([1, 2], axis=1) 在列、行中按值进行升序排序

选择或切片

  • PySpark
    df.select('xx', 'yy') 选择一列或多列
    df.first() 以行的形式返回第一行。(注:行的形式为 [Row(col_name1=value1, col_name2=value2, ...)]
    df.head(n)df.take(n) 以行的形式返回前 n 行;df.tail(n) 以行的形式返回最后 n 行
    df.collect() 以行的形式返回所有行

  • Pandas
    df.xxdf['xx'] 选择列名为 xx 的列,df [k] 选择行名为 k 的行
    df.iat[:, k]df.iloc[:, k] 选择第 k 列,df.iat[k]df.iloc[k] 选择第 k 行

过滤

  • PySpark
    df.filter(df['xx'] > k) 或者 df.where(df['xx'] > k)

    取值存在于:df.filter(F.col('xx').isin(filter_list))

    空值处理:

    • 值为 null:df.filter(F.col('xx').isNull())df.filter(F.col('xx').isNotNull())
    • 值为空字符串:df.filter(F.col('xx') == '')
    • 值为 np.nan:df.filter(F.col('xx') == np.nan)
  • Pandas
    df[df['xx']>k] 或者 s[s>k]

    取值存在于:df[df.xx.isin(filter_list)]

    空值处理:包括 null,np.NaN,pd.NaT,None,不包括空字符串 df[df.xx.isnull()]df[df.xx.isna()]df.filter(F.col('xx').notnull())df[df.xx.notnull()]df[df.xx.notna()]

  • Examples:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    >> pdf = pd.DataFrame(dict(numpy=[np.NaN], pandas=[pd.NaT], empty=[''], none=[None]))
    >> pdf
    numpy pandas empty none
    0 NaN NaT None
    >> pdf.isnull()
    numpy pandas empty none
    0 True True False True
    >> from pyspark.sql.types import *
    >> sdf = spark.createDataFrame(pdf, StructType([StructField('numpy', DoubleType(), True),StructField('pandas', StringType(), True),StructField('empty', StringType(), True),StructField('pandas', StringType(), True)])) # 必须指定schema,否则报错”ValueError: Some of types cannot be determined after inferring“
    >> sdf.show()
    +-----+------+-----+------+
    |numpy|pandas|empty|pandas|
    +-----+------+-----+------+
    | NaN| null| | null|
    +-----+------+-----+------+

分组聚合

  • PySpark
    df.groupBy(cols_to_group) 或者 df.groupBy(cols_to_group).avg('xx').show() 应用单个函数
    df.groupBy(cols_to_group).agg(F.avg('xx'), F.min('xx'), F.max('xx')).show() 应用多个函数

  • Pandas
    df.groupby(cols_to_group)
    df.groupby(cols_to_group).avg('xx')

    group filter by function: df.groupby(cols_to_group).filter(function)

统计

  • PySpark
    df.count() 输出总行数
    df.describe() 描述某些列的 count, mean, stddev, min, max

  • Pandas
    df.count() 输出每一列的非空行数
    df.shape 输出行数 x 列数
    df.describe() 描述某些列的 count, mean, std, min, 25%, 50%, 75%, max

合并

  • PySpark
    扩充列
    df.join() 同名列不自动添加后缀,只有键值完全匹配才保留一份副本
    'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross'.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    >> df_left = spark.createDataFrame(pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4]))
    >> df_left.show()
    +----+-----------+
    |Keys|Values_Left|
    +----+-----------+
    |key1| value1|
    |key2| value2|
    |key3| value3|
    |key4| value4|
    |key5| value5|
    +----+-----------+

    >> df_right = spark.createDataFrame(pd.DataFrame({'Keys': ['key1', 'key2', 'key2', 'key3', 'key6'], 'Values_Right': ['value6', 'value7', 'value8', 'value9', 'value10']}, index=[0, 1, 2, 3, 4]))
    >> df_right.show()
    +----+------------+
    |Keys|Values_Right|
    +----+------------+
    |key1| value6|
    |key2| value7|
    |key2| value8|
    |key3| value9|
    |key6| value10|
    +----+------------+

    >> df_left.join(df_right, ['Keys'], 'left').sort('Keys').show() # left=leftouter=left_outer
    +----+-----------+------------+
    |Keys|Values_Left|Values_Right|
    +----+-----------+------------+
    |key1| value1| value6|
    |key2| value2| value7|
    |key2| value2| value8|
    |key3| value3| value9|
    |key4| value4| null|
    |key5| value5| null|
    +----+-----------+------------+

    >> df_right.join(df_left, ['Keys'], 'right').sort('Keys').show() # right=rightouter=right_outer
    +----+------------+-----------+
    |Keys|Values_Right|Values_Left|
    +----+------------+-----------+
    |key1| value6| value1|
    |key2| value7| value2|
    |key2| value8| value2|
    |key3| value9| value3|
    |key4| null| value4|
    |key5| null| value5|
    +----+------------+-----------+

    >> df_left.join(df_right, ['Keys'], 'left_semi').sort('Keys').show() # left_semi=leftsemi
    +----+-----------+
    |Keys|Values_Left|
    +----+-----------+
    |key1| value1|
    |key2| value2|
    |key3| value3|
    +----+-----------+

    >> df_left.join(df_right, ['Keys'], 'left_anti').sort('Keys').show() # leftanti=left_anti
    +----+-----------+
    |Keys|Values_Left|
    +----+-----------+
    |key4| value4|
    |key5| value5|
    +----+-----------+

    >> df_left.join(df_right, ['Keys'], 'inner').sort('Keys').show()
    +----+-----------+------------+
    |Keys|Values_Left|Values_Right|
    +----+-----------+------------+
    |key1| value1| value6|
    |key2| value2| value8|
    |key2| value2| value7|
    |key3| value3| value9|
    +----+-----------+------------+

    >> df_left.join(df_right, ['Keys'], 'outer').sort('Keys').show() # outer=full=fullouter=full_outer
    +----+-----------+------------+
    |Keys|Values_Left|Values_Right|
    +----+-----------+------------+
    |key1| value1| value6|
    |key2| value2| value8|
    |key2| value2| value7|
    |key3| value3| value9|
    |key4| value4| null|
    |key5| value5| null|
    |key6| null| value10|
    +----+-----------+------------+

    >> df_left = df_left.withColumnRenamed('Keys', 'Keys_Left')
    >> df_right = df_right.withColumnRenamed('Keys', 'Keys_Right')
    >> df_left.join(df_right, df_left.Keys_Left==df_right.Keys_Right, 'cross').show()
    +---------+-----------+----------+------------+
    |Keys_Left|Values_Left|Keys_Right|Values_Right|
    +---------+-----------+----------+------------+
    | key1| value1| key1| value6|
    | key2| value2| key2| value8|
    | key2| value2| key2| value7|
    | key3| value3| key3| value9|
    +---------+-----------+----------+------------+

    >> df_left.crossJoin(df_right).show(25)
    +---------+-----------+----------+------------+
    |Keys_Left|Values_Left|Keys_Right|Values_Right|
    +---------+-----------+----------+------------+
    | key1| value1| key1| value6|
    | key1| value1| key2| value7|
    | key1| value1| key2| value8|
    | key1| value1| key3| value9|
    | key1| value1| key6| value10|
    | key2| value2| key1| value6|
    | key2| value2| key2| value7|
    | key2| value2| key2| value8|
    | key2| value2| key3| value9|
    | key2| value2| key6| value10|
    | key3| value3| key1| value6|
    | key3| value3| key2| value7|
    | key3| value3| key2| value8|
    | key3| value3| key3| value9|
    | key3| value3| key6| value10|
    | key4| value4| key1| value6|
    | key4| value4| key2| value7|
    | key4| value4| key2| value8|
    | key4| value4| key3| value9|
    | key4| value4| key6| value10|
    | key5| value5| key1| value6|
    | key5| value5| key2| value7|
    | key5| value5| key2| value8|
    | key5| value5| key3| value9|
    | key5| value5| key6| value10|
    +---------+-----------+----------+------------+

    >> from pyspark.sql.window import Window

    >> df1 = spark.createDataFrame(pd.DataFrame({'Keys': [1, 5, 10], 'Values_Left': [1, 5, 10]}, index=[0, 1, 2]))
    >> df2 = spark.createDataFrame(pd.DataFrame({'Keys': [1, 2, 3, 6, 7], 'Values_Right': [1, 2, 3, 6, 7]}, index=[0, 1, 2, 4, 5]))
    >> df1 = df1.withColumn('Values_Right', F.lit(None))
    >> df2 = df2.withColumn('Values_Left', F.lit(None))
    >> df3 = df1.unionByName(df2)
    >> w = Window.orderBy('Keys_On', 'Values_Left').rowsBetween(Window.unboundedPreceding, -1)
    >> df3.withColumn('Values_Right', F.last('Values_Right', True).over(w)).filter(~F.isnull('Values_Left')).show()
    +-------+-------+-----------+------------+
    |Keys_On|Keys_By|Values_Left|Values_Right|
    +-------+-------+-----------+------------+
    | 1| a| 1| 1|
    | 5| a| 5| 3|
    | 10| b| 10| 7|
    +-------+-------+-----------+------------+
    >> def asof_join(l, r):
    >> return pd.merge_asof(l, r, on='Keys_On', by='Keys_By')
    >> df1 = spark.createDataFrame(df_left)
    >> df2 = spark.createDataFrame(df_right)
    >> df1.groupby('Keys_By').cogroup(df2.groupby('Keys_By')).applyInPandas(
    >> asof_join, schema="Keys_On int, Keys_By string, Values_Left float, Values_Right float"
    >> ).show()
    +-------+-------+-----------+------------+
    |Keys_On|Keys_By|Values_Left|Values_Right|
    +-------+-------+-----------+------------+
    | 10| b| 10.0| 7.0|
    | 1| a| 1.0| 1.0|
    | 5| a| 5.0| 3.0|
    +-------+-------+-----------+------------+

    扩充行
    df.union():两个 df 合并,按位置进行合并,列名以前表为准(a.union (b) 列名顺序以 a 为准)
    df.unoinAll():同 union 方法
    df.unionByName():两个 df 合并,按列名进行合并
    df1.unionByName(df2).unionByName(df3)

  • Pandas
    Pandas 下有 concat 方法,支持轴向合并
    pd.concat([df1, df2, df3], ignore_index=True, sort=False)
    df1.append([df2, df3], ignore_index=True, sort=False)
    df1.join([df2, df3])
    Pandas 下有 merge 方法,支持多列合并
    同名列自动添加后缀,对应键仅保留一份副本
    df.join() 支持多列合并
    df.append() 支持多行合并

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    >> df_left = pd.DataFrame({'Keys': ['key1', 'key2', 'key3', 'key4', 'key5'], 'Values_Left': ['value1', 'value2', 'value3', 'value4', 'value5']}, index=[0, 1, 2, 3, 4])
    Keys Values_Left
    0 key1 value1
    1 key2 value2
    2 key3 value3
    3 key4 value4
    4 key5 value5
    >> df_right = pd.DataFrame({'Keys': ['key1', 'key2', 'key2', 'key3', 'key6'], 'Values_Right': ['value6', 'value7', 'value8', 'value9', 'value10']}, index=[0, 1, 2, 4, 5])
    Keys Values_Right
    0 key1 value6
    1 key2 value7
    2 key2 value8
    4 key3 value9
    5 key6 value10
    >> df_left.append(df_right) <=> pd.concat([df_left, df_right])
    Keys Values_Left Values_Right
    0 key1 value1 NaN
    1 key2 value2 NaN
    2 key3 value3 NaN
    3 key4 value4 NaN
    4 key5 value5 NaN
    0 key1 NaN value6
    1 key2 NaN value7
    2 key2 NaN value8
    4 key3 NaN value9
    5 key6 NaN value10
    >> pd.concat([df_left, df_right], axis=1) # 类似join outer,key为index
    Keys Values_Left Keys Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    3 key4 value4 NaN NaN
    4 key5 value5 key3 value9
    5 NaN NaN key6 value10
    >> pd.concat([df_left, df_right], axis=1, join='inner')
    Keys Values_Left Keys Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    4 key5 value5 key3 value9
    >> pd.merge(df_left, df_right, on='Keys')
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key3 value3 value9
    >> pd.merge(df_left, df_right, how='left', on='Keys')
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key3 value3 value9
    4 key4 value4 NaN
    5 key5 value5 NaN
    >> pd.merge(df_left, df_right, how='right', on='Keys')
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key3 value3 value9
    4 key6 NaN value10
    >> pd.merge(df_left, df_right, how='outer', on='Keys', validate='one_to_many', indicator='indicator_column')
    Keys Values_Left Values_Right indicator_column
    0 key1 value1 value6 both
    1 key2 value2 value7 both
    2 key2 value2 value8 both
    3 key3 value3 value9 both
    4 key4 value4 NaN left_only
    5 key5 value5 NaN left_only
    6 key6 NaN value10 right_only
    >> pd.merge(df_left, df_right, how='inner', on='Keys')
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key3 value3 value9
    >> df_left.join(df_right, how='left', lsuffix='_Left', rsuffix='_Right')
    Keys_Left Values_Left Keys_Right Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    3 key4 value4 NaN NaN
    4 key5 value5 key3 value9
    >> df_left.join(df_right, how='right', lsuffix='_Left', rsuffix='_Right')
    Keys_Left Values_Left Keys_Right Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    4 key5 value5 key3 value9
    5 NaN NaN key6 value10
    >> df_left.join(df_right, how='outer', lsuffix='_Left', rsuffix='_Right')
    Keys_Left Values_Left Keys_Right Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    3 key4 value4 NaN NaN
    4 key5 value5 key3 value9
    5 NaN NaN key6 value10
    >> df_left.join(df_right, how='inner', lsuffix='_Left', rsuffix='_Right')
    Keys_Left Values_Left Keys_Right Values_Right
    0 key1 value1 key1 value6
    1 key2 value2 key2 value7
    2 key3 value3 key2 value8
    4 key5 value5 key3 value9
    >> df_left.join(df_right.reindex(['key1', 'key2', 'key2', 'key3', 'key6']), on='Keys', lsuffix='_Left', rsuffix='_Right') <=> pd.merge(df_left, df_right.reindex(['key1', 'key2', 'key2', 'key3', 'key6']), left_on="Keys", right_index=True, how="left", sort=False, suffixes=('_Left', '_Right'))
    Keys Keys_Left Values_Left Keys_Right Values_Right
    0 key1 key1 value1 NaN NaN
    1 key2 key2 value2 NaN NaN
    1 key2 key2 value2 NaN NaN
    2 key3 key3 value3 NaN NaN
    3 key4 key4 value4 NaN NaN
    4 key5 key5 value5 NaN NaN
    >> df_left.combine_first(df_right) # 相同Keys只保留左边的
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key3 value3 value8
    3 key4 value4 NaN
    4 key5 value5 value9
    5 key6 NaN value10
    >> df_left.update(df_right)
    >> df_left
    Keys Values_Left
    0 key1 value1
    1 key2 value2
    2 key2 value3
    3 key4 value4
    4 key3 value5
    >> pd.merge_ordered(df_left, df_right, on="Keys")
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key2 value3 value7
    4 key2 value3 value8
    5 key3 value5 value9
    6 key4 value4 NaN
    7 key6 NaN value10
    >> pd.merge_ordered(df_left, df_right, left_by="Keys")
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key2 value3 value7
    4 key2 value3 value8
    5 key4 value4 NaN
    6 key3 value5 value9
    >> pd.merge_ordered(df_left, df_right, right_by="Keys")
    Keys Values_Left Values_Right
    0 key1 value1 value6
    1 key2 value2 value7
    2 key2 value2 value8
    3 key2 value3 value7
    4 key2 value3 value8
    5 key3 value5 value9
    6 key6 NaN value10
    >> df_left = df_left.rename(columns={'Values_Left': 'Values'})
    >> df_right = df_right.rename(columns={'Values_Right': 'Values'}).reset_index(drop=True)
    >> df_left.compare(df_right, align_axis=0)
    Keys Values
    0 self NaN value1
    other NaN value6
    1 self NaN value2
    other NaN value7
    2 self NaN value3
    other NaN value8
    3 self key4 value4
    other key3 value9
    4 self key3 value5
    other key6 value10
    >> df_left.compare(df_right, keep_shape=True)
    Keys Values
    self other self other
    0 NaN NaN value1 value6
    1 NaN NaN value2 value7
    2 NaN NaN value3 value8
    3 key4 key3 value4 value9
    4 key3 key6 value5 value10
    >> df_left.compare(df_right, keep_shape=True, keep_equal=True)
    Keys Values
    self other self other
    0 key1 key1 value1 value6
    1 key2 key2 value2 value7
    2 key2 key2 value3 value8
    3 key4 key3 value4 value9
    4 key3 key6 value5 value10
    >> df_left = pd.DataFrame({'Keys_On': [1, 5, 10], 'Keys_By': ['a', 'a', 'b'], 'Values_Left': [1, 5, 10]}, index=[0, 1, 2])
    Keys_On Keys_By Values_Left
    0 1 a 1
    1 5 a 5
    2 10 b 10
    >> df_right = pd.DataFrame({'Keys_On': [1, 2, 3, 6, 7], 'Keys_By': ['a', 'a', 'a', 'b', 'b'], 'Values_Right': [1, 2, 3, 6, 7]}, index=[0, 1, 2, 4, 5])
    Keys_On Keys_By Values_Right
    0 1 a 1
    1 2 a 2
    2 3 a 3
    4 6 b 6
    5 7 b 7
    >> pd.merge_asof(df_left, df_right, on='Keys_On')
    Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
    0 1 a 1 a 1
    1 5 a 5 a 3
    2 10 b 10 b 7
    >> pd.merge_asof(df_left, df_right, on='Keys_On', allow_exact_matches=False)
    Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
    0 1 a 1 NaN NaN
    1 5 a 5 a 3.0
    2 10 b 10 b 7.0
    >> pd.merge_asof(df_left, df_right, on='Keys_On', direction='forward')
    Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
    0 1 a 1 a 1.0
    1 5 a 5 b 6.0
    2 10 b 10 NaN NaN
    >> pd.merge_asof(df_left, df_right, on='Keys_On', direction='nearest')
    Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
    0 1 a 1 a 1
    1 5 a 5 b 6
    2 10 b 10 b 7
    >> pd.merge_asof(df_left, df_right, on='Keys_On', tolerance=2)
    Keys_On Keys_By_x Values_Left Keys_By_y Values_Right
    0 1 a 1 a 1.0
    1 5 a 5 a 3.0
    2 10 b 10 NaN NaN
    >> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By')
    Keys_On Keys_By Values_Left Values_Right
    0 1 a 1 1
    1 5 a 5 3
    2 10 b 10 7
    >> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', allow_exact_matches=False)
    Keys_On Keys_By Values_Left Values_Right
    0 1 a 1 NaN
    1 5 a 5 3.0
    2 10 b 10 7.0
    >> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', direction='forward')
    Keys_On Keys_By Values_Left Values_Right
    0 1 a 1 1.0
    1 5 a 5 NaN
    2 10 b 10 NaN
    >> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', direction='nearest')
    Keys_On Keys_By Values_Left Values_Right
    0 1 a 1 1
    1 5 a 5 3
    2 10 b 10 7
    >> pd.merge_asof(df_left, df_right, on='Keys_On', by='Keys_By', tolerance=2)
    Keys_On Keys_By Values_Left Values_Right
    0 1 a 1 1.0
    1 5 a 5 3.0
    2 10 b 10 NaN

删除

  • PySpark
    删除一列:df.drop('xx') 或者 df.drop(F.col('xx'))
    删除多列:df.drop(*['xx', 'yy', ...])
    删除某(些)行:使用 filter 方法
    去重:df.dropDuplicates()df.drop_duplicates() 其中参数必须为数组,df.distinct() 其中不能传入参数。
    在 cols_to_group 相同的情况下保留 xx 列的值最小 / 最大的行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    df.groupBy(cols_to_group).agg(F.min/max('xx').alias('xx'))
    df.withColumn('min/max', F.min/max('xx').over(Window.partitionBy(cols_to_group))).where(F.col('xx') == F.col('min/max'))
    # example
    >> pdf = pd.DataFrame(dict(os=['ANDROID', 'ANDROID', 'IOS', 'IOS', 'IOS', 'IOS'], region=['SG', 'SG', 'SG', 'SG', 'CN', 'CN'], value=[1,2,3,4,5,6]))
    >> sdf = spark.createDataFrame(pdf)
    >> sdf.show()
    +-------+------+-----+
    | os|region|value|
    +-------+------+-----+
    |ANDROID| SG| 1|
    |ANDROID| SG| 2|
    | IOS| SG| 3|
    | IOS| SG| 4|
    | IOS| CN| 5|
    | IOS| CN| 6|
    +-------+------+-----+
    >> sdf.groupBy(['os', 'region']).agg(F.min('value')).show()
    +-------+------+----------+
    | os|region|min(value)|
    +-------+------+----------+
    | IOS| CN| 5|
    | IOS| SG| 3|
    |ANDROID| SG| 1|
    +-------+------+----------+
    >> from pyspark.sql.window import Window
    >> sdf.withColumn('min', F.min('value').over(Window.partitionBy(['os', 'region']))).where(F.col('value') == F.col('min')).show()
    +-------+------+-----+---+
    | os|region|value|min|
    +-------+------+-----+---+
    | IOS| CN| 5| 5|
    | IOS| SG| 3| 3|
    |ANDROID| SG| 1| 1|
    +-------+------+-----+---+

  • Pandas
    删除某(些)列:df.drop(['xx', 'xx'], axis=1) 或者 df.drop(columns=['xx', 'yy'])
    删除某(些)行:df.drop([0, 1]) 其中 0,1 为 index 名字
    去重:drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False) 其中 keep 取值 {‘first’, ‘last’, False};keep 第一次或者最后一次出现。如果需要根据某列最大最小值保留的话,则需提前排序

更改数据类型

  • PySpark
    更改指定列的数据类型:df = df.withColumn('xx', F.col('xx').cast(Type()))

  • Pandas
    更改所有列的数据类型:df.astype('type')
    更改指定列的数据类型:df.astype({'xx': 'int32'})

失数据处理

  • PySpark
    不自动添加 NaNs,且不抛出错误
    fillna 函数:df.na.fill()
    dropna 函数:df.na.drop()

  • Pandas
    对缺失数据自动添加 NaNs
    fillna 函数:df.fillna()
    dropna 函数:df.dropna()

SQL 语句

  • PySpark
    表格注册:把 DataFrame 结构注册成 SQL 语句使用类型
    df.registerTempTable('tt') 或者 sqlContext.registerDataFrameAsTable(df, 'tt')
    spark.sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')
    功能注册:把函数注册成 SQL 语句使用类型
    spark.registerFunction('stringLengthString', lambda x: len(x))
    spark.sql("SELECT stringLengthString('test')")

  • Pandas
    import sqlite3
    pd.read_sql('SELECT xx, yy FROM tt WHERE xx >= m AND yy <= n')

函数应用

  • PySpark
    df.foreach(f) 或者 df.rdd.foreach(f) 将 df 的每一列应用函数 f
    df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 将 df 的每一块应用函数 f
    UDF (User-defined Function):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # one-line way:
    udf_name = F.udf(lambda x, y: expression, ReturnType())

    # def way:
    @F.udf(returnType=ReturnType())
    def udf_name(x):
    expression
    return

    df.withColumn('xx', udf_name(F.col('xx'), F.col('yy')))
  • Pandas
    df.apply(f) 将 df 的每一列应用函数 f

  • Pandas udf in PySpark
    Driver 可能缺少必要的 package:pyarrow 导致报错 ModuleNotFoundError: No module named 'pyarrow'。有多种方案解决:

    1. 参考 Python Package Management
      PySpark 允许通过以下方式将 Python 文件 (.py)、压缩的 Python 包 (.zip) 和 Egg 文件 (.egg) 上传到执行程序:
      设置配置 spark.submit.pyFiles 或者 --py-filesSpark 脚本中的设置选项或者直接调用 pyspark.SparkContext.addPyFile()应用程序。
      这是将额外的自定义 Python 代码发送到集群的直接方法。只添加单个文件或压缩整个包并上传它们。如果使用 pyspark.SparkContext.addPyFile(),即使 job 开始运行后也允许使用上传的代码。
      但是不允许添加构建为 Wheels 包,因此不允许包含与本机代码的依赖关系。

    2. 使用 Conda 打包
      Conda 是使用最广泛的 Python 包管理系统之一。PySpark 可以直接使用 Conda 环境通过利用 conda-pack 来传送第三方 Python 包,它是一个命令行工具,可创建可重定位的 Conda 环境。
      下面的示例创建了一个 Conda 环境以在驱动程序和执行程序上使用,并将其打包到一个存档文件中。此存档文件捕获 Python 的 Conda 环境并存储 Python 解释器及其所有相关依赖项。

      1
      2
      3
      conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
      conda activate pyspark_conda_env
      conda pack -f -o pyspark_conda_env.tar.gz

      之后可以使用 --archives 选项或 spark.archives 配置(spark.yarn.dist.archives 在 YARN 中)将其与脚本或代码一起发送。它会自动解压缩执行程序上的存档。
      spark-submit 脚本的情况下,您可以按如下方式使用它:

      1
      2
      3
      export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
      export PYSPARK_PYTHON=./environment/bin/python
      spark-submit --archives pyspark_conda_env.tar.gz#environment app.py

      注意 PYSPARK_DRIVER_PYTHON 不应为 YARN 集群模式设置上述内容。
      如果您使用的是常规 Python shell 或 Notebook,您可以尝试如下所示:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      import os
      from pyspark.sql import SparkSession
      from app import main

      os.environ['PYSPARK_PYTHON'] = './environment/bin/python'
      os.environ['PYSPARK_DRIVER_PYTHON'] = './environment/bin/python'

      spark = SparkSession.builder.config(
      "spark.archives", # 'spark.yarn.dist.archives' in YARN.
      "pyspark_conda_env.tar.gz#environment").getOrCreate()
      main(spark)

      # 或者
      SPARK_CONF = SparkConf() \
      .set('spark.yarn.dist.archives', 'pyspark_conda_env.tar.gz#environment') \
      .set('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1') \
      .set('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
      sc = SparkContext(appName=appName, conf=SPARK_CONF)
      sc.setLogLevel('ERROR')
      spark = SparkSession.builder.enableHiveSupport().getOrCreate()

      对于 pyspark Shell:

      1
      2
      3
      export PYSPARK_DRIVER_PYTHON=python
      export PYSPARK_PYTHON=./environment/bin/python
      pyspark --archives pyspark_conda_env.tar.gz#environment

    3. 使用本机,不用集群

      1
      2
      3
      4
      5
      6
      SPARK_CONF = SparkConf() \
      .setMaster('local') \
      .set('spark.submit.deployMode', 'client')
      sc = SparkContext(appName=appName, conf=SPARK_CONF)
      sc.setLogLevel('ERROR')
      spark = SparkSession.builder.enableHiveSupport().getOrCreate()

Map-Reduce 操作

  • PySpark
    df.map(func)df.reduce(func) 返回类型 seqRDDs

  • Pandas
    map-reduce操作map(func, list)reduce(func, list) 返回类型 seq

Diff 操作

  • PySpark
    没有 diff 操作(Spark 的上下行是相互独立,分布式存储的)

  • Pandas
    有 diff 操作,处理时间序列数据(Pandas 会对比当前行与上一行)

Most Common 计数

  • PySpark

    1
    2
    3
    4
    df.cube(column_name/column_list).count().sort('count', ascending=False)
    df.groupBy(column_name/column_list).agg({'*': 'count'}).withColumnRenamed('count(1)', 'new_count')
    df.groupBy(column_name/column_list).agg(F.count('*').alias('new_count'))
    df.groupBy(column_name/column_list).agg(F.countDistinct('*').alias('new_count'))

  • Pandas
    df.value_counts(ascending=False)

Json 格式化、选择、解析

  • PySpark
    格式化:df.withColum('json_string', F.to_json(F.struct('key1', 'key2')))
    选择:df.select('json_string.key')
    解析:json_schema = spark.read.json(df.rdd.map(lambda row: row.json_string)).schema
    F.get_json_object('json_string', '$.key')
    F.from_json(F.get_json_object('json_string', '$.key'), schema)

  • Pandas
    格式化:df['json_string'] = df[['key1', 'key2']].to_dict(orient='records')
    选择:df.json_string.apply(pd.Series).key
    解析:

    1
    df.join(pd.concat(list(df['json_string'].apply(lambda x: pd.json_normalize(json.loads(x)))), ignore_index=True))

Explode 操作、Pivot 操作、Melt 操作

  • PySpark
    Explode <=> Groupby:
    将 xx 列中的每行的列表 / 数组值分拆形成单独的行

    1
    2
    3
    4
    df.withColumn('xx', explode(F.col('yy'))) # 忽略空值或者空列表/数组
    df.withColumn('xx', explode_outer(F.col('yy'))) # 不忽略空值或者空列表/数组
    df.groupBy(cols_to_group).agg(F.collect_list('xx').alias('yy')) # 返回list形式,不去重
    df.groupBy(cols_to_group).agg(F.collect_set('xx').alias('yy')) # 返回set形式,去重

    Pivot <=> Melt

    Label Keys Values
    0 'key1' 'value1'
    0 'key2' 'value2'
    1 'key1' 'value3'
    1 'key2' 'value4'

    <=>

    Label key1 key2
    0 'value1' 'value2'
    1 'value3' 'value4'
    1
    2
    3
    4
    5
    6
    7
    8
    def melt(df, id_vars, value_vars, var_name, value_name):
    _vars_and_vals = F.create_map(list(chain.from_iterable([[F.lit(c), F.col(c)] for c in value_vars])))
    _tmp = df.select(*id_vars, explode(_vars_and_vals)).withColumnRenamed('key', var_name).withColumnRenamed('value', value_name)
    return _tmp.select(*cols)

    df_melt = df.selectExpr('Label', 'stack({}, {}) as (Keys, Values)'.format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in df.columns[1:])))).where(F.col('Values').isNotNull())
    df_melt = melt(df, id_vars='Label', value_vars=df.columns[1:], var_name='Keys', value_name='Values')
    df_pivot = df_melt.groupBy('Label').pivot('Keys').agg(F.first('Values'))
  • Pandas
    Explode:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    >> df = pd.DataFrame( {'a':['A', 'B', 'C'], 'b':[[1], [2, 3], [4, 5, 4]]})
    >> df.explode('b') # explode的列名还是b
    a b
    0 A 1
    1 B 2
    1 B 3
    2 C 4
    2 C 5
    2 C 4
    >> df = pd.DataFrame( {'a':['A', 'B', 'B', 'C', 'C', 'C'], 'b':[1, 2, 3, 4, 5, 4]})
    >> df
    a b
    0 A 1
    1 B 2
    2 B 3
    3 C 4
    4 C 5
    5 C 4
    >> df.groupby('a')['b'].apply(list).reset_index()
    a b
    0 A [1]
    1 B [2, 3]
    2 C [4, 5, 4]
    >> df.groupby('a')['b'].apply(set).apply(list).reset_index()
    a b
    0 A [1]
    1 B [2, 3]
    2 C [4, 5]

    Pivot <=> Melt:

    Label Keys Values
    0 'key1' 'value1'
    0 'key2' 'value2'
    1 'key1' 'value3'
    1 'key2' 'value4'

    <=>

    Values

    Keys

    key1

    key2

    Label

    0

    value1

    value2

    1

    value3

    value4

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    >> import numpy as np
    >> import pandas as pd

    >> def melt(frame):
    >> # identifier is index
    >> N, K = frame.shape
    >> data = {
    >> "Values": frame.to_numpy().ravel(),
    >> "Keys": np.tile(np.asarray(frame.columns), K),
    >> "Label": np.asarray(frame.index).repeat(N),
    >> }
    >> return pd.DataFrame(data, columns=["Label", "Keys", "Values"])

    >> df = pd.DataFrame({'key1': ['value1', 'value3'], 'key2': ['value2', 'value4']}, index=[0, 1])
    key1 key2
    0 value1 value2
    1 value3 value4
    >> df_melt = melt(df)
    Label Keys Values
    0 0 key1 value1
    1 0 key2 value2
    2 1 key1 value3
    3 1 key2 value4
    >> df = df.reset_index().rename(columns={'index': 'Label'})
    Label key1 key2
    0 0 value1 value2
    1 1 value3 value4
    >> df_melt = df.melt(id_vars=['Label'], var_name='Keys', value_name='Values').sort_values('Label').reset_index(drop=True)
    Label Keys Values
    0 0 key1 value1
    1 0 key2 value2
    2 1 key1 value3
    3 1 key2 value4
    >> df_pivot = df_melt.pivot(index="Label", columns="Keys", values="Values")
    Keys key1 key2
    Label
    0 value1 value2
    1 value3 value4