Spark 笔记|教程 —— FP-Growth & Association Rules


应用FP-growth获得association rules

在用文章 Python 批量导入数据(.csv)到 Cassandra 的方法导入数据到Cassandra之后,要从spark中实现交易数据的association analysis,应用FP-growth算法。

此处Cassandra中的数据是以每个商品(item)为一条记录(row),举例:

+---+-----+-----+
| id|order|items|
+---+-----+-----+
|  0|    a|    1|
|  1|    a|    2|
|  2|    a|    5|
|  3|    b|    1|
|  4|    b|    2|
|  5|    b|    3|
|  6|    b|    5|
|  7|    c|    1|
|  8|    c|    2|
+---+-----+-----+

在文章 Spark 连接 Cassandra 中已经实现把Cassandra table转换成Spark DataFrame。下面的代码全是基于Spark DataFrame。

首先要把上面的数据格式转换成如下格式,以应用Spark的FP-growth模块。

+-----+-------------------+---+
|order|collect_list(items)| id|
+-----+-------------------+---+
|    a|          [1, 2, 5]|  1|
|    b|       [1, 2, 3, 5]|  2|
|    c|             [1, 2]|  3|
+-----+-------------------+---+

所需代码如下:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
df_order = df_item.groupBy("order").agg(F.collect_list("items")).withColumn("id", \
F.row_number().over(Window.orderBy("order")))

之后应用FP-growth:

from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df_order)

显示结果:

model.freqItemsets.show()

+---------+----+
|    items|freq|
+---------+----+
|      [1]|   3|
|      [2]|   3|
|   [2, 1]|   3|
|      [5]|   2|
|   [5, 2]|   2|
|[5, 2, 1]|   2|
|   [5, 1]|   2|
+---------+----+
model.associationRules.show()

+----------+----------+------------------+
|antecedent|consequent|        confidence|
+----------+----------+------------------+
|    [2, 1]|       [5]|0.6666666666666666|
|    [5, 1]|       [2]|               1.0|
|       [2]|       [1]|               1.0|
|       [2]|       [5]|0.6666666666666666|
|       [5]|       [2]|               1.0|
|       [5]|       [1]|               1.0|
|    [5, 2]|       [1]|               1.0|
|       [1]|       [2]|               1.0|
|       [1]|       [5]|0.6666666666666666|
+----------+----------+------------------+


计算support, confidence, lift值

关于lift主要是用于过滤过于频繁,对bundle分析产生误导的商品。定于如下:

lift value

The picture is from FP-growth

其他定义请参照维基百科 Association rule learning

实现代码:

这里我用10周的数据,分别得到每周的frequent items和association rules,并计算support, lift,基于已经的到confidence值。最后把每周的结果分别写入csv文件

for week in tqdm(range(0, 9)):

    #Here the market basket analysis for the specific week is done
    df_week = df.filter((df["time_event"] >= weeks[week]) & (df["time_event"] < weeks[week+1]))

    df_week_fp = df_week.groupBy("order_id").agg(F.collect_set("item_id")).withColumn("id", F.row_number().over(Window.orderBy("order_id")))

    total = df_week_fp.count()

    fpGrowth = FPGrowth(itemsCol="collect_set(item_id)", minSupport=0.00001, minConfidence=0.00001)
    model = fpGrowth.fit(df_week_fp)

    df_week_freq = model.freqItemsets
    df_week_ar = model.associationRules

    df_week_supp = df_week_freq.withColumn("support", df_week_freq.freq/total)

    df_week_supp_ar = df_week_ar.join(df_week_supp,df_week_supp.items==df_week_ar.consequent, 'left').drop('items')

    df_week_lift = df_week_supp_ar.withColumn("lift", df_week_supp_ar.confidence/df_week_supp_ar.support)

    df_week_freq_new = df_week_freq.withColumn('items', concat_ws(',', 'items'))
    df_week_lift_new = df_week_lift.withColumn('antecedent', concat_ws(',', 'antecedent')).withColumn('consequent', concat_ws(',', 'consequent'))

    numb = week+1

    df_week_freq_new.coalesce(1).write.option("header", "true").csv("/lib/analysis/result_week_freq{}".format(numb))
    df_week_lift_new.coalesce(1).write.option("header", "true").csv("/lib/analysis/result_week_ar{}".format(numb))

结果如下图所示:

+----------+----------+----------+-----+----+-------------------+------------------+
|antecedent|consequent|confidence|items|freq|            support|              lift|
+----------+----------+----------+-----+----+-------------------+------------------+
|    [5, 2]|       [3]|       0.5|  [3]|   2| 0.2857142857142857|              1.75|
| [5, 2, 1]|       [3]|       0.5|  [3]|   2| 0.2857142857142857|              1.75|
|       [2]|       [3]|       0.4|  [3]|   2| 0.2857142857142857|1.4000000000000001|
|    [5, 1]|       [3]|       0.5|  [3]|   2| 0.2857142857142857|              1.75|
|       [5]|       [3]|       0.4|  [3]|   2| 0.2857142857142857|1.4000000000000001|
|    [2, 1]|       [3]|       0.5|  [3]|   2| 0.2857142857142857|              1.75|
|       [1]|       [3]|       0.4|  [3]|   2| 0.2857142857142857|1.4000000000000001|
|    [3, 1]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
| [3, 2, 1]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
|    [3, 2]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
|       [2]|       [5]|       0.8|  [5]|   5| 0.7142857142857143|              1.12|
|       [3]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
|    [2, 1]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
|       [1]|       [5]|       0.8|  [5]|   5| 0.7142857142857143|              1.12|
|       [6]|       [5]|       1.0|  [5]|   5| 0.7142857142857143|               1.4|
|       [5]|       [6]|       0.2|  [6]|   1|0.14285714285714285|1.4000000000000001|
|    [5, 2]|       [1]|       1.0|  [1]|   5| 0.7142857142857143|               1.4|
|    [3, 2]|       [1]|       1.0|  [1]|   5| 0.7142857142857143|               1.4|
|       [2]|       [1]|       0.8|  [1]|   5| 0.7142857142857143|              1.12|
|       [3]|       [1]|       1.0|  [1]|   5| 0.7142857142857143|               1.4|
+----------+----------+----------+-----+----+-------------------+------------------+

上面代码中coalesce(1)的作用是减少partition的数量到1,进而使只输出一个csv文件,否则有多少个partition就会把每周的csv文件分成多少份。但是有一个弊端就是partition数量减少会使每个partition处理的数据增多,导致占用很多memory,可能遇到java.lang.OutOfMemoryError的问题。

所以可以弃用coalesce(1)。如果你机器RAM足够大,还可以增加spark目录中的conf文件夹中的spark-defaults.conf文件中的spark.driver.memory值以避免内存不够的情况。

登录注册后参与评论