2018年1月27日 星期六

[Spark] Spark效能優化(未完成)

Spark效能優化

Accumulators
count細節數目時使用 EX: 讀出來的文字當發現有錯誤就使用 wrong Accumulators + 1 , 正確就用right Accumulators


right_count = accumulator(0)
wrong_count = accumulator(0)

def calulate(line):

    global wrong_count

    if line == “:
        wrong_count += 1

    print wrong_count.value

Broadcast

每個節點都要去查一張共用的 read only表, 或者join時可將小張表廣播 EX: 電話資料一直要去查地址表, 將地址表廣播 broadcast join和普通join: 在大數據分布式系統中,大量數據的移動對性能的影響也是巨大的。基於這個思想,在兩個RDD進行join操作的時候,如果其中一個RDD相對小很多,可以將小的RDD進行collect操作然後設置為broadcast變量,這樣做之後,另一個RDD就可以使用map操作進行join,這樣能夠有效的減少相對大很多的那個RDD的數據移動。 原文網址:https://read01.com/KEB55B.html


# Create dummy data and register as table
df = sc.parallelize([
    (1,"a"),
    (2,"b"),
    (3,"c")]).toDF(["num","let"])
df.registerTempTable('table')

# Create broadcast variable from local dictionary
myDict = {1: "y", 2: "x", 3: "z"}
broadcastVar = sc.broadcast(myDict) 
# Alternatively, if your dict is a key-value rdd, 
# you can do sc.broadcast(rddDict.collectAsMap())

# Create lookup function and apply it
sqlContext.registerFunction("lookup", lambda x: broadcastVar.value.get(x))
sqlContext.sql('select num, let, lookup(num) as test from table').show()
+---+---+----+
|num|let|test|
+---+---+----+
|  1|  a|   y|
|  2|  b|   x|
|  3|  c|   z|
+---+---+----+

cache

使用時機 : 當同一張表需要被查詢好幾次, 或拿它做不同事情, 可以將其cache
Spark SQL可以通過調用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減少內存佔用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來刪除內存中的table。

存儲DataFrame

Spark DataFrame可以使用persist() API存儲到Spark緩存中。persist()可以緩存DataFrame數據到不同的存儲媒介。
本次實驗使用瞭以下Spark緩存存儲級別(StorageLevel):
  • MEMORY_ONLY:在Spark JVM內存中存儲DataFrame對象
  • MEMORY_ONLY_SER:在Spark JVM內存中存儲序列化後的DataFrame對象
  • DISK_ONLY: 將DataFrame數據存儲在在地磁盤
下面是一個如何使用persist() API緩存DataFrame的例子:
df.persist(MEMORY_ONLY)

standalone模式下資源分配不均勻導致內存溢出

在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個參數,但是沒有配置–executor-cores這個參數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那麼在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。

效能調優

基於Alluxio系統的Spark DataFrame高效存儲管理技術

沒有留言:

張貼留言