|
JDBC 目的是連結MSSQL,MYSQL等關聯式資料庫取得資料表的資料並傳入Spark的dataframe格式當中, 本篇文章簡單描述安裝設定的過程與spark中使用的方式
|
到下方網址download jdbc driver
微軟官網
解壓縮找到sqljdbc42.jar ,移到指定目錄下, 透過spark submit 啟用, 也可透過config檔讀取
spark submit…
–driver-class-path /usr/local/spark2/jars/sqljdbc42.jar
下方為使用範例, 先把它包裝成class:
class MssqlDAO(object):
def copyMssqlByJdbc(self, sqlContext, ip, databaseName, user, password, table):
"""
Copy data to Spark from MSSQL table
spark-submit need jars qljdbc42.jar
"""
url = "jdbc:sqlserver://" + ip + ""
properties = {
"databaseName": databaseName,
"user": user,
"password":password
}
return sqlContext.read.jdbc(url, table, properties=properties)
def writeMssqlByJdbc(self, dataFrame, ip, databaseName, user, password, table, mode="error"):
"""
write data to MSSQL table from Spark
spark-submit need jars --driver-class-path /usr/local/spark2/jars/sqljdbc42.jar
"""
url = "jdbc:sqlserver://" + ip + ""
properties = {
"databaseName": databaseName,
"user": user,
"password":password
}
dataFrame.write.jdbc(url, table, properties=properties, mode=mode)
就可以這樣使用
m_dao = MssqlDao()
master_df = m_dao.copyMssqlByJdbc(sqlContext,
MASTER_IP,
MASTER_DB,
MASTER_ACCOUNT,
MASTER_PASSWORD,
MASTER_TABLE)
master_df.cache()
搭配SQL使用
QUERY_TABLE = "(select * from dbo.table where Employee = '0000000' and Status <> '1' \
and ExtensionNumber <> '' and Date > '" + YYYYMMDD + "') as table"
master_df = m_dao.copyMssqlByJdbc(sqlContext,
MASTER_IP,
MASTER_DB,
MASTER_ACCOUNT,
MASTER_PASSWORD,
QUERY_TABLE)
沒有留言:
張貼留言