Spark Dataframe資料前處理語法整理(使用Python)
資料前處理的範圍過大, 本篇假設資料已收集或是已經可以接收streaming來的資料後, 相關的處理語法整理, 如何保存或與其他系統串聯會在其他篇說明
Spark本身在Mllib當中已經有相當多特徵前處理的方法, 有一部分也可以使用在資料的前處理
可以參考這篇 >>
[Spark]Feature Transformers 特徵轉換器
想了解Spark RDD.Dataframe.SparkSQL這三種資料結構差別可以參考此篇 >>
Spark RDD & dataframe & SQL範例
很有耐心看所有API的請看此可以參考此篇 >>
Spark API
這裡再補充一些常用方法
# pandas與spark dataframe互轉
pandas_df = spark_df.toPandas()
spark_df = sqlContext.createDataframe(pandas_df)
## convert from JSON to dataframe,當Key 只有一個層級,轉換不用特別處理
df = self.spark.createDataFrame(data.json())
# spark dataframe與RDD互轉
rdd_df = df.rdd
df = rdd.df.toDF()
# add & column
from pyspark.sql.functions import lit
df = df.withColumn("new", lit(0))
df.drop('nwe')
# 空值
df = df.na.fill()
df = df.na.drop()
df = df.dropna(subset=['col01','col02'])# 有na就移除
# 修改值
df = df.withColumn("new", 1)
df = df.withColumn("year2", df["year1"].cast("Int"))
df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")# `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`
# group by
GroupedData = df.groupBy("age")
df.groupBy("A").avg("B")
from pyspark.sql import functions
df.groupBy("A").agg(functions.avg("B"), functions.min("B"), functions.max("B"))
整合後GroupedData類型可用的方法(均返回DataFrame類型):
avg(*cols) —— 計算每組中一列或多列的平均值
count() —— 計算每組中一共有多少行,返回DataFrame有2列,一列為分組的組名,另一列為行總數
max(*cols) —— 計算每組中一列或多列的最大值
mean(*cols) —— 計算每組中一列或多列的平均值
min(*cols) —— 計算每組中一列或多列的最小值
sum(*cols) —— 計算每組中一列或多列的總和
# function
df.foreach(f) 或者 df.rdd.foreach(f)
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)
df.map(func)# 回傳RDD
df.reduce(func)
# query
df.show()
df.show(30)
df.printSchema()
list = df.head(3) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.take(5) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.collect() # 注意效能
from pyspark.sql.functions import isnull
df = df.filter(isnull("col_a")) # 查詢null
r = Row(age=11, name='Alice')
print r.__fields__ # ['age', 'name']
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c) # 選擇a、b、c三列
df.select(df["a"], df["b"], df["c"]) # 選擇a、b、c三列
#sort
df = df.sort("age", ascending=False)
# filter
df = df.filter(df['age']>21)
df = df.where(df['age']>21)
# 對null或nan數據進行過濾:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a")) # 把a列裏面數據為null的篩選出來(代表python的None類型)
df = df.filter(isnan("a")) # 把a列裏面數據為nan的篩選出來(Not a Number,非數字數據)
#SparkSQL
df.createOrReplaceTempView("table")
df = sqlContext.sql(“SELECT name,age FROM tableWHERE age> = 13 AND age <= 19“)
# time series
win_monday = window("col1","1週", startTime =“4天”)
GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday])
# 再補上一些會使用到的技巧
def twoListToDictionary(self, array_1, array_2):
"""
two list combine to a dictionary
"""
return dict(zip(array_1, array_1))
def removeHeader(self, rawData):
"""
remove header from raw data
"""
header = rawData.first()
return rawData.filter(lambda x:x != header)
def pdToDf(self, sqlContext, pd):
"""
pandas dataframe To spark dataframe
"""
if pd.empty:
schema = StructType([])
return sqlContext.createDataFrame([],schema)
else:
return sqlContext.createDataFrame(pd)
def dfToList(self, dataFrame):
"""
dataframe to list
"""
return dataFrame.rdd.map(lambda r:r.name).collect()
def dfCast(self, dataFrame, castType="double"):
"""
cast dataframe
"""
dataFrame = dataFrame.select([dataFrame[column].cast(castType) for column in dataFrame.columns])
# data_df=data_df.select([col(column).cast("double").alias(column) for column in data_df.columns])
# print dataFrame.show()
return dataFrame
def dfCastColumns(self, dataFrame, columns, castType="double"):
"""
cast dataframe with specific columns
"""
dfNames = dataFrame.schema.names
noCastList = [x for x in dfNames if x not in columns]
dataFrame = dataFrame.select(noCastList + [dataFrame[column].cast(castType) for column in dataFrame.columns
if column in columns])
return dataFrame
def dfDropDuplicates(self, dataFrame, column=[]):
return dataFrame.dropDuplicates(column)
def dfImputer(self, imputerType, dataFrame, castType="double"):
"""
Imput value when raw data has incorrect value(null or empty or nan)
"""
imputerType = imputerType.lower()
dataFrame = self.dfCast(dataFrame, castType)
if imputerType == "drop":
return self.__dropNanRaw(dataFrame)
elif imputerType == "mean":
return self.__imputMeanAtNanRaw(dataFrame)
else:
return dataFrame
def dropNanRaw(self, dataFrame):
"""
Drop the raw when raw data has incorrect value(null or empty or nan)
"""
dataFrame = dataFrame.na.drop()
# print dataFrame.show()
return dataFrame
PySpark的DataFrame處理方法:增刪改查https://blog.csdn.net/weimingyu945/article/details/77981884
沒有留言:
張貼留言