python , pyspark與Cassandra資料交換需要透過特別的套件, 一個是python-cassnadra-driver, 一個是pyspark-cassandra-connector, 這裡介紹使用方法和Spark-Cassandra部屬策略
python與不同DB要進行資料交換時有不同的實作方式, 這篇是以Cassandra DB當成範例, 分成兩部分, 一個是python與pandas dataframe直接存取, 一個是pyspark dataframe存取, 存取的方式與套件有很多種, 本篇只提供自己實作過可行的範例
首先必須先安裝套件
a. python-cassandra driver
b. pyspark-cassandra connector
因為我自己也忘了當時怎麼安裝這兩個套件的, 所以安裝步驟先跳過, 之後再補上
安裝完後就可以使用, 要記得pyspark submit的時候要把pyspark-cassandra connector帶入
以下使用jupytor notebook開啟為範例, 並且帶入connector
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" PYTHONIOENCODING="utf8" MASTER=spark://xxx.xxx.xxx.xxx:7077 pyspark --total-executor-cores 2 --executor-memory 2g --jars /usr/local/spark_jars/sqljdbc42.jar,/usr/local/spark_jars/spark-cassandra-connector-2.0.0-M2-s_2.11.jar
# encoding=UTF-8
#!flask/bin/python
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
import pandas as pd
from pyspark.sql.types import StructType
class CassandraType(object):
PRODUCTION = 0
TEST = 1
class CassandraDAO(object):
# you have to install following items :
# a. python-Cassandra driver
# b. pyspark cassandra connector
def __init__(self, type):
# print('runing father.__init__')
if type == CassandraType.PRODUCTION:
self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx']
self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx"
else:
self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx']
self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx"
self.formatString = "org.apache.spark.sql.cassandra"
self.username = "username"
self.password = "password"
self.cluster = None
self.session = None
self.createSession()
def __del__(self):
self.cluster.shutdown()
def pandas_factory(self, colnames, rows):
return pd.DataFrame(rows, columns=colnames)
def createSession(self):
print "contact_points = " + self.contact_points_str
self.cluster = Cluster(
contact_points=self.contact_points, #random select a node
# load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1'),
# auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
)
self.session = self.cluster.connect()
self.session.row_factory = self.pandas_factory
self.session.default_fetch_size = 10000000 #needed for large queries, otherwise driver will do pagination. Default is 50000.
def getSession(self):
return self.session
def execCQL(self, keyspace, cql):
"""
execute CQL
"""
self.session.set_keyspace(keyspace)
self.session.execute_async(cql)
def execCQLSelect(self, keyspace, cql):
"""
execute CQL, select only
"""
self.session.set_keyspace(keyspace)
# cassandra ResultSet
async_results = self.session.execute_async(cql)
return async_results
def execCQLCallBackAnysc(self, keyspace, cql, handle_success, handle_error):
"""
execute CQL, if success => handle_success function, else handle_error
"""
self.session.set_keyspace(keyspace)
async_results = self.session.execute_async(cql)
async_results.add_callbacks(handle_success, handle_error)
def execCQLSelectToPandasDF(self, keyspace, cql):
"""
execute CQL, select only, return Pandas DataFrame
"""
self.session.set_keyspace(keyspace)
# cassandra ResultSet
async_results = self.session.execute_async(cql)
# to Pandas DataFrame
return async_results.result()._current_rows
def execCQLSelectToDF(self, sqlContext, keyspace, cql):
"""
execute CQL, select only, return Spark DataFrame
"""
# pandas dataframe to spark dataframe
pandas_dataframe = self.execCQLSelectToPandasDF(keyspace, cql)
if pandas_dataframe.empty:
schema = StructType([])
return sqlContext.createDataFrame([],schema)
else:
return sqlContext.createDataFrame(pandas_dataframe)
def execCQLSelectToRDD(self, sqlContext, keyspace, cql):
"""
execute CQL, select only, return Spark RDD
"""
return self.execCQLSelectToDF(sqlContext, keyspace, cql).rdd.map(tuple)#dataFrame to RDD
@property
def contactPoints(self):
return self.contact_points
@contactPoints.setter
def contactPoints(self, contact_points):
self.contact_points = contact_points
@contactPoints.deleter
def contactPoints(self):
del self.contact_points
# pyspark cassandra connector
def readFromCassandraDF(self, sqlContext, keyspace, table):
"""
read data from Cassandra, return Dataframe
"""
return sqlContext.read\
.format(self.formatString)\
.options(table=table, keyspace=keyspace)\
.option("spark.cassandra.connection.host",self.contact_points_str)\
.load()
def readFromCassandraRDD(self, sqlContext, keyspace, table):
"""
read data from Cassandra, return RDD
"""
df = sqlContext.read\
.format(self.formatString)\
.options(table=table, keyspace=keyspace)\
.option("spark.cassandra.connection.host",self.contact_points_str)\
.load()
return df.rdd.map(tuple)#dataFrame to RDD
def saveToCassandraDF(self, dataFrame, keyspace, table, mode="error"):
"""
Save data to Cassandra using DataFrame, select one mode to save
SaveMode.ErrorIfExists (default) | "error" When saving a DataFrame to a data source,
if data already exists, an exception is expected to be thrown.
SaveMode.Append | "append" When saving a DataFrame to a data source,
if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
SaveMode.Overwrite | "overwrite" Overwrite mode means that when saving a DataFrame to a data source,
if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
SaveMode.Ignore | "ignore" Ignore mode means that when saving a DataFrame to a data source,
if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
"""
dataFrame.write\
.format(self.formatString)\
.mode(mode)\
.options(table=table, keyspace=keyspace)\
.option("spark.cassandra.connection.host",self.contact_points_str)\
.save()
## use : DAO(10)
# def __call__(self, radius):
# if radius <= 0:
# print('必須是正數')
# else:
# self.radius = radius
使用class示範
CASSANDRA_ENV = CassandraType.TEST
c_dao = CassandraDAO(CASSANDRA_ENV)
## 執行CQL
result = c_dao.execCQL('mykeyspace', 'select * from table;')
## DB讀出成pandas dataframe
pandas_df = c_dao.execCQLSelectToPandasDF('mykeyspace', cql)
## pyspark dataframe 寫入DB
c_dao.saveToCassandraDF(spark_df, 'mykeyspace', 'table_name','overwrite')
若遇到pandas dataframe必須insert多筆資料, 會有效能問題, 可以使用prepared語法
from cassandra.cluster import Cluster
cluster = Cluster(
contact_points='192.168.0.90,192.168.0.91,192.168.0.92'.split(',')
)
session = cluster.connect()
session.default_fetch_size = 10000000
session = cluster.connect('helper_dev_keyspace')
query = ("INSERT INTO intent_algorithm2(algorithm_id,algorithm_name) VALUES (?,?)")
prepared = session.prepare(query)
alist = [str(x) for x in range(10000)]
test_pd = pd.DataFrame({'algorithm_id':alist,
'algorithm_name':alist})
api_start = datetime.now()
# for index, row in test_pd.iterrows():
# session.execute_async(prepared,
# (row.algorithm_id,row.algorithm_name))
api_end = datetime.now()
api_duration = str(api_end - api_start)
api_duration
部屬Spark與Cassandra
Cassandra會根據印記範圍將資料放置於各節點, 這些分散的資料很適合做為Spark的資料源, 因為每個節點只擁有部分的資料, 建議在資料中心內每個Cassandra節點上安裝Spark Worker
補充
- 部屬時建議為分析任務獨立建立一座資料中心, 或將蒐集資料與分析的資料切開, 避免執行分析任務負載衝擊從及其他節點
- Cassandra特別適合蒐集大量感測器的資料, 但CQL分析起來相當困難, 搭配Spark進行資料處理試較為洽當的方法, 但不該將Spark-Cassandra取代Hadoop, 不適合做為資料倉儲使用



沒有留言:
張貼留言