2018年3月14日 星期三

[Spark] Dataframe資料前處理實作(使用Python)

Spark Dataframe資料前處理語法整理(使用Python)

資料前處理的範圍過大, 本篇假設資料已收集或是已經可以接收streaming來的資料後, 相關的處理語法整理, 如何保存或與其他系統串聯會在其他篇說明

Spark本身在Mllib當中已經有相當多特徵前處理的方法, 有一部分也可以使用在資料的前處理
可以參考這篇 >> 
[Spark]Feature Transformers 特徵轉換器

Spark dataframe在資料前處理的觀念也跟Pandas dataframe很類似, 只是兩者語法上有差異
可以參考這篇 >>
[Pandas] 資料探索與前處理


想了解Spark RDD.Dataframe.SparkSQL這三種資料結構差別可以參考此篇 >>
Spark RDD & dataframe & SQL範例

很有耐心看所有API的請看此
可以參考此篇 >>

Spark API

這裡再補充一些常用方法

 
# pandas與spark dataframe互轉
pandas_df = spark_df.toPandas()
spark_df = sqlContext.createDataframe(pandas_df)
## convert from JSON to dataframe,當Key 只有一個層級,轉換不用特別處理
df = self.spark.createDataFrame(data.json())

# spark dataframe與RDD互轉
rdd_df = df.rdd
df = rdd.df.toDF()

# add & column
from pyspark.sql.functions import lit
df = df.withColumn("new", lit(0))

df.drop('nwe')

# 空值
df = df.na.fill()
df = df.na.drop()
df = df.dropna(subset=['col01','col02'])# 有na就移除

# 修改值
df = df.withColumn("new", 1)
df = df.withColumn("year2", df["year1"].cast("Int"))
df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")# `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`

# group by
GroupedData = df.groupBy("age")
df.groupBy("A").avg("B")

from pyspark.sql import functions
df.groupBy("A").agg(functions.avg("B"), functions.min("B"), functions.max("B"))
整合後GroupedData類型可用的方法(均返回DataFrame類型):
avg(*cols)     ——   計算每組中一列或多列的平均值
count()          ——   計算每組中一共有多少行,返回DataFrame有2列,一列為分組的組名,另一列為行總數
max(*cols)    ——   計算每組中一列或多列的最大值
mean(*cols)  ——  計算每組中一列或多列的平均值
min(*cols)     ——  計算每組中一列或多列的最小值
sum(*cols)    ——   計算每組中一列或多列的總和

# function
df.foreach(f) 或者 df.rdd.foreach(f)
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)
df.map(func)# 回傳RDD
df.reduce(func)

# query
df.show()
df.show(30)
df.printSchema()
list = df.head(3)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.take(5)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.collect() # 注意效能

from pyspark.sql.functions import isnull
df = df.filter(isnull("col_a")) # 查詢null

r = Row(age=11, name='Alice')
print r.__fields__    #  ['age', 'name']

df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c)    # 選擇a、b、c三列
df.select(df["a"], df["b"], df["c"])    # 選擇a、b、c三列

#sort
df = df.sort("age", ascending=False)

# filter
df = df.filter(df['age']>21)
df = df.where(df['age']>21)

# 對null或nan數據進行過濾:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a"))  # 把a列裏面數據為null的篩選出來(代表python的None類型)
df = df.filter(isnan("a"))  # 把a列裏面數據為nan的篩選出來(Not a Number,非數字數據)

#SparkSQL
df.createOrReplaceTempView("table")
df = sqlContext.sql(“SELECT name,age FROM tableWHERE age> = 13 AND age <= 19“)

# time series
win_monday = window("col1","1週", startTime =“4天”)
GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday])

# 再補上一些會使用到的技巧

def twoListToDictionary(self, array_1, array_2):
    """
    two list combine to a dictionary
    """

    return dict(zip(array_1, array_1))

 def removeHeader(self, rawData):
    """
    remove header from raw data
    """
   
    header = rawData.first()
    return rawData.filter(lambda x:x != header)
 
 def pdToDf(self, sqlContext, pd):
    """
    pandas dataframe To spark dataframe
    """

    if pd.empty:
        schema = StructType([])
        return sqlContext.createDataFrame([],schema)
    else:   
        return sqlContext.createDataFrame(pd)
   
 def dfToList(self, dataFrame):
    """
    dataframe to list
    """       

    return dataFrame.rdd.map(lambda r:r.name).collect()   
     

 def dfCast(self, dataFrame, castType="double"):
    """
    cast dataframe
    """       

    dataFrame = dataFrame.select([dataFrame[column].cast(castType) for column in dataFrame.columns])
#   data_df=data_df.select([col(column).cast("double").alias(column) for column in data_df.columns])
#   print dataFrame.show()
    return dataFrame   

def dfCastColumns(self, dataFrame, columns, castType="double"):
    """
    cast dataframe with specific columns
    """           

    dfNames = dataFrame.schema.names
    noCastList = [x for x in dfNames if x not in columns]
    dataFrame = dataFrame.select(noCastList + [dataFrame[column].cast(castType) for column in dataFrame.columns
                                                                       if column in columns])
    return dataFrame


def dfDropDuplicates(self, dataFrame, column=[]):

    return dataFrame.dropDuplicates(column)

def dfImputer(self, imputerType, dataFrame, castType="double"):
    """
    Imput value when raw data has incorrect value(null or empty or nan)
    """

    imputerType = imputerType.lower()
    dataFrame = self.dfCast(dataFrame, castType)
       
    if imputerType == "drop":
        return self.__dropNanRaw(dataFrame)
    elif imputerType == "mean":
        return self.__imputMeanAtNanRaw(dataFrame)
    else:
        return dataFrame

def dropNanRaw(self, dataFrame):
    """
    Drop the raw when raw data has incorrect value(null or empty or nan)
    """
        
    dataFrame = dataFrame.na.drop()
#   print dataFrame.show()
    return dataFrame
Ref: 
PySpark的DataFrame處理方法:增刪改查https://blog.csdn.net/weimingyu945/article/details/77981884

pyspark-DataFrame API

https://blog.csdn.net/wc781708249/article/details/78245989

沒有留言:

張貼留言