![]() |
JDBC 目的是連結MSSQL,MYSQL等關聯式資料庫取得資料表的資料並傳入Spark的dataframe格式當中, 本篇文章簡單描述安裝設定的過程與spark中使用的方式 |
到下方網址download jdbc driver
微軟官網解壓縮找到sqljdbc42.jar ,移到指定目錄下, 透過spark submit 啟用, 也可透過config檔讀取
spark submit… –driver-class-path /usr/local/spark2/jars/sqljdbc42.jar
下方為使用範例, 先把它包裝成class:
class MssqlDAO(object): def copyMssqlByJdbc(self, sqlContext, ip, databaseName, user, password, table): """ Copy data to Spark from MSSQL table spark-submit need jars qljdbc42.jar """ url = "jdbc:sqlserver://" + ip + "" properties = { "databaseName": databaseName, "user": user, "password":password } return sqlContext.read.jdbc(url, table, properties=properties) def writeMssqlByJdbc(self, dataFrame, ip, databaseName, user, password, table, mode="error"): """ write data to MSSQL table from Spark spark-submit need jars --driver-class-path /usr/local/spark2/jars/sqljdbc42.jar """ url = "jdbc:sqlserver://" + ip + "" properties = { "databaseName": databaseName, "user": user, "password":password } dataFrame.write.jdbc(url, table, properties=properties, mode=mode)就可以這樣使用
m_dao = MssqlDao() master_df = m_dao.copyMssqlByJdbc(sqlContext, MASTER_IP, MASTER_DB, MASTER_ACCOUNT, MASTER_PASSWORD, MASTER_TABLE) master_df.cache()搭配SQL使用
QUERY_TABLE = "(select * from dbo.table where Employee = '0000000' and Status <> '1' \ and ExtensionNumber <> '' and Date > '" + YYYYMMDD + "') as table" master_df = m_dao.copyMssqlByJdbc(sqlContext, MASTER_IP, MASTER_DB, MASTER_ACCOUNT, MASTER_PASSWORD, QUERY_TABLE)

沒有留言:
張貼留言