2015/10/18

Apache Spark 測試 FPGrowth(傳統C語言與Spark 的簡易測試)


因為剛剛把 Apache 的 Spark 設定好,順便驗證測試看看效能如何?
剛好發現 Spark 的 MLLIB 內有內建的 FPGrowth 演算法,剛好跟它有點熟所以就用這個演算法加上網路上的資料來測試看看。

先講結論:資料量小,不利於使用巨量資料工具。


關於 fpgrowth 演算法請先參考這篇:
http://blog.jangmt.com/2015/10/fpgrowth-algorithm.html

底下紀錄測試的過程,及使用的參數。

測試運算資料來源:http://fimi.ua.ac.be/data/ Frequent Itemset Mining Dataset Repository---

LAB (1)
Christian Borgelt 寫的 C 語言程式 FPGrowth 對上 Spark scala fpgrowth 程式
---
# FPGrowth 先把 -m1 同時出現的SET設為1個,支持度 5 ,信任度為 80%(default)
# 這個案例花費了 0.09s 運算了 0.01s
[hadoop@hnamenode FrequentItemset]$ ./fpgrowth -m1 -s5 T10I4D100K.dat T10I4D100K.out.txt
./fpgrowth - find frequent item sets with the fpgrowth algorithm
version 6.7 (2015.08.18)         (c) 2004-2015   Christian Borgelt
reading T10I4D100K.dat ... [870 item(s), 100000 transaction(s)] done [0.09s].
filtering, sorting and recoding items ... [10 item(s)] done [0.00s].
sorting and reducing transactions ... [281/100000 transaction(s)] done [0.01s].
writing T10I4D100K.out.txt ... [10 set(s)] done [0.00s].
[hadoop@hnamenode FrequentItemset]$ cat T10I4D100K.out.txt
368 (7.828)
529 (7.057)
829 (6.81)
766 (6.265)
722 (5.845)
354 (5.835)
684 (5.408)
217 (5.375)
494 (5.102)
419 (5.057)

----
上面這個 fp-growth 程式是以 同一台機器的 masternode 跑的,但是沒有 20 台 SlaveNode。
----
底下這個案例參考官方網站的範例:scala 語言的 fp-growth 程式
http://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-growth 
FPgrowth MLIB
https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html

使用 spark 用 standalone 模式,啟動 spark-shell
spark-shell  --master spark://192.168.1.100:7077
這套 spark 有1台 masternode 以及 20 台 slavenode,

首先把檔案放到 hadoop HDFS 內
[hadoop@hnamenode FrequentItemset]$ hdfs dfs -ls /public/FrequentItemset
Found 4 items
-rw-r--r--   3 hadoop supergroup    4022055 2015-10-17 21:39 /public/FrequentItemset/T10I4D100K.dat
-rw-r--r--   3 hadoop supergroup   15478113 2015-10-17 21:39 /public/FrequentItemset/T40I10D100K.dat
-rw-r--r--   3 hadoop supergroup         68 2015-10-17 21:42 /public/FrequentItemset/sample_fpgrowth.txt
-rw-r--r--   3 hadoop supergroup 1481890176 2015-10-17 23:13 /public/FrequentItemset/webdocs.dat

---
Apache Spark scala FPGrowth 並將支持度、出現數量設定為一樣。
---
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.FPGrowth

val data = sc.textFile("/public/FrequentItemset/T10I4D100K.dat")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

# setMinSupport(0.05) 為./fpgrowth程式的 -s5
val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}


# 這是信任度 80%,可以忽略不計
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}
----
時間:約 40s
----
[368], 7828
[529], 7057
[829], 6810
[766], 6265
[722], 5845
[354], 5835
[684], 5408
[217], 5375
[494], 5102
[419], 5057

----
LAB(1)結果:
不用比了,Christian Borgelt 寫的 C 語言程式 FPGrowth  狂勝。
Spark 跑了快約 40 - 50 秒因為在分散資料及 I/O 的時間浪費太多了...
但這例子先確認正確的這樣產生的結果是一樣的,後續的比對才可以繼續下去。
----


----
LAB (2) 資料檔案更換為  http://fimi.ua.ac.be/data/webdocs.dat.gz 來測試
webdocs.dat 檔案約 1.4GB 
----
# 花費 39.75 + 2.02 + 1.57 = 43.34 秒
[hadoop@hnamenode FrequentItemset]$ ./fpgrowth -m1 -s50 webdocs.dat webdocs.out
./fpgrowth - find frequent item sets with the fpgrowth algorithm
version 6.7 (2015.08.18)         (c) 2004-2015   Christian Borgelt
reading webdocs.dat ... [5267656 item(s), 1692082 transaction(s)] done [39.75s].
filtering, sorting and recoding items ... [5 item(s)] done [2.02s].
sorting and reducing transactions ... [32/1692082 transaction(s)] done [1.57s].
writing webdocs.out ... [10 set(s)] done [0.00s].

[hadoop@hnamenode FrequentItemset]$ more webdocs.out
122 (84.4832)
8 122 (72.566)
8 (77.4013)
49 122 (65.9012)
49 8 122 (61.1858)
49 8 (63.1131)
49 (69.6349)
124 122 (50.2531)
124 (52.4418)
516 (50.7154)

---
scala FPGrowth
---
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.FPGrowth

# 檔案放到 HDFS 內
val data = sc.textFile("/public/FrequentItemset/webdocs.dat")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth().setMinSupport(0.5).setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}

----
15/10/18 02:48:47
15/10/18 02:49:34  共花費 47秒
----
[122], 1429525
[8], 1309694
[8,122], 1227876
[49], 1178279
[49,8], 1067926
[49,8,122], 1035314
[49,122], 1115102
[124], 887358
[124,122], 850323
[516], 858146
----
LAB(2) 結果:
47 > 43 仍是傳統作法獲勝,但是時間上已經相差不多。資料量確實影響了計算的速度。
----


----
LAB (3) 
----
接著繼續把檔案放大,這個 webdocs.dat 檔案放大 *2 , *4 , * 10 的倍數來測試。
-rw-rw-r--. 1 hadoop hadoop  14G Oct 18 00:29 webdocs10.dat
-rw-rw-r--. 1 hadoop hadoop 2.8G Oct 18 00:29 webdocs2.dat
-rw-rw-r--. 1 hadoop hadoop 5.6G Oct 18 00:29 webdocs4.dat
-rw-rw-r--. 1 hadoop hadoop 1.4G Oct 14  2010 webdocs.dat

-----
此處以 webdocs10.dat 這個 14G 的檔案來說,如果以傳統的方法會遇到巨大的困難
光是 load 資料到記憶體就花了 436.84s ,運算 8.30s + 15.03s 約計 460 秒
-----
[hadoop@hnamenode FrequentItemset]$ ./fpgrowth -m1 -s50 webdocs10.dat webdocs10.out
./fpgrowth - find frequent item sets with the fpgrowth algorithm
version 6.7 (2015.08.18)         (c) 2004-2015   Christian Borgelt
reading webdocs10.dat ... [5267656 item(s), 16920820 transaction(s)] done [436.84s].
filtering, sorting and recoding items ... [5 item(s)] done [8.30s].
sorting and reducing transactions ... [32/16920820 transaction(s)] done [15.03s].
writing webdocs10.out ... [10 set(s)] done [0.00s].
----
[hadoop@hnamenode FrequentItemset]$ cat webdocs10.out 
122 (84.4832)
8 122 (72.566)
8 (77.4013)
49 122 (65.9012)
49 8 122 (61.1858)
49 8 (63.1131)
49 (69.6349)
124 122 (50.2531)
124 (52.4418)
516 (50.7154)

-----
如果以 Spark Scala FPGrowth 運算,它透過分散運算到 20 台 pc 配合 hdfs 分散存取
spark-shell  --master spark://192.168.1.100:7077
----
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.FPGrowth

# 更換檔案為 /public/FrequentItemset/webdocs10.dat (在 HDFS內)
val data = sc.textFile("/public/FrequentItemset/webdocs10.dat")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

# 條件一樣
val fpg = new FPGrowth().setMinSupport(0.5).setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
----
15/10/18 02:18:29 開始
15/10/18 02:20:28 結束,約 119 秒
[122], 14295250
[8], 13096940
[8,122], 12278760
[49], 11782790
[49,8], 10679260
[49,8,122], 10353140
[49,122], 11151020
[124], 8873580
[124,122], 8503230
[516], 8581460
----

Spark 工作流程
Spark 運算流程 https://spark.apache.org/docs/1.5.1/img/cluster-overview.png 

----
心得:
* 使用 Hadoop or Spark 這類的 big data 運算工具,初期消耗的運算成本很高,當資料量大到超過一般機器無法處理的程度時,才能顯示它和傳統運算的差異。

* 就這陣子測試的心得,如果你的資料量大於 1GB 後才有使用巨量運算的價值。

* 不過,兩種方式本來就是不一樣,硬是拿來比較不是很妥當。在適當的資料量,選擇適合的工具,才是比較正確的作法。


張貼留言