python , pyspark與Cassandra資料交換需要透過特別的套件, 一個是python-cassnadra-driver, 一個是pyspark-cassandra-connector, 這裡介紹使用方法和Spark-Cassandra部屬策略
python與不同DB要進行資料交換時有不同的實作方式, 這篇是以Cassandra DB當成範例, 分成兩部分, 一個是python與pandas dataframe直接存取, 一個是pyspark dataframe存取, 存取的方式與套件有很多種, 本篇只提供自己實作過可行的範例
首先必須先安裝套件
a. python-cassandra driver
b. pyspark-cassandra connector
因為我自己也忘了當時怎麼安裝這兩個套件的, 所以安裝步驟先跳過, 之後再補上
安裝完後就可以使用, 要記得pyspark submit的時候要把pyspark-cassandra connector帶入
以下使用jupytor notebook開啟為範例, 並且帶入connector
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" PYTHONIOENCODING="utf8" MASTER=spark://xxx.xxx.xxx.xxx:7077 pyspark --total-executor-cores 2 --executor-memory 2g --jars /usr/local/spark_jars/sqljdbc42.jar,/usr/local/spark_jars/spark-cassandra-connector-2.0.0-M2-s_2.11.jar
# encoding=UTF-8 #!flask/bin/python from cassandra.cluster import Cluster from cassandra.policies import DCAwareRoundRobinPolicy from cassandra.auth import PlainTextAuthProvider from cassandra.query import BatchStatement import pandas as pd from pyspark.sql.types import StructType class CassandraType(object): PRODUCTION = 0 TEST = 1 class CassandraDAO(object): # you have to install following items : # a. python-Cassandra driver # b. pyspark cassandra connector def __init__(self, type): # print('runing father.__init__') if type == CassandraType.PRODUCTION: self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx'] self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx" else: self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx'] self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx" self.formatString = "org.apache.spark.sql.cassandra" self.username = "username" self.password = "password" self.cluster = None self.session = None self.createSession() def __del__(self): self.cluster.shutdown() def pandas_factory(self, colnames, rows): return pd.DataFrame(rows, columns=colnames) def createSession(self): print "contact_points = " + self.contact_points_str self.cluster = Cluster( contact_points=self.contact_points, #random select a node # load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1'), # auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra') ) self.session = self.cluster.connect() self.session.row_factory = self.pandas_factory self.session.default_fetch_size = 10000000 #needed for large queries, otherwise driver will do pagination. Default is 50000. def getSession(self): return self.session def execCQL(self, keyspace, cql): """ execute CQL """ self.session.set_keyspace(keyspace) self.session.execute_async(cql) def execCQLSelect(self, keyspace, cql): """ execute CQL, select only """ self.session.set_keyspace(keyspace) # cassandra ResultSet async_results = self.session.execute_async(cql) return async_results def execCQLCallBackAnysc(self, keyspace, cql, handle_success, handle_error): """ execute CQL, if success => handle_success function, else handle_error """ self.session.set_keyspace(keyspace) async_results = self.session.execute_async(cql) async_results.add_callbacks(handle_success, handle_error) def execCQLSelectToPandasDF(self, keyspace, cql): """ execute CQL, select only, return Pandas DataFrame """ self.session.set_keyspace(keyspace) # cassandra ResultSet async_results = self.session.execute_async(cql) # to Pandas DataFrame return async_results.result()._current_rows def execCQLSelectToDF(self, sqlContext, keyspace, cql): """ execute CQL, select only, return Spark DataFrame """ # pandas dataframe to spark dataframe pandas_dataframe = self.execCQLSelectToPandasDF(keyspace, cql) if pandas_dataframe.empty: schema = StructType([]) return sqlContext.createDataFrame([],schema) else: return sqlContext.createDataFrame(pandas_dataframe) def execCQLSelectToRDD(self, sqlContext, keyspace, cql): """ execute CQL, select only, return Spark RDD """ return self.execCQLSelectToDF(sqlContext, keyspace, cql).rdd.map(tuple)#dataFrame to RDD @property def contactPoints(self): return self.contact_points @contactPoints.setter def contactPoints(self, contact_points): self.contact_points = contact_points @contactPoints.deleter def contactPoints(self): del self.contact_points # pyspark cassandra connector def readFromCassandraDF(self, sqlContext, keyspace, table): """ read data from Cassandra, return Dataframe """ return sqlContext.read\ .format(self.formatString)\ .options(table=table, keyspace=keyspace)\ .option("spark.cassandra.connection.host",self.contact_points_str)\ .load() def readFromCassandraRDD(self, sqlContext, keyspace, table): """ read data from Cassandra, return RDD """ df = sqlContext.read\ .format(self.formatString)\ .options(table=table, keyspace=keyspace)\ .option("spark.cassandra.connection.host",self.contact_points_str)\ .load() return df.rdd.map(tuple)#dataFrame to RDD def saveToCassandraDF(self, dataFrame, keyspace, table, mode="error"): """ Save data to Cassandra using DataFrame, select one mode to save SaveMode.ErrorIfExists (default) | "error" When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. SaveMode.Append | "append" When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. SaveMode.Overwrite | "overwrite" Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. SaveMode.Ignore | "ignore" Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. """ dataFrame.write\ .format(self.formatString)\ .mode(mode)\ .options(table=table, keyspace=keyspace)\ .option("spark.cassandra.connection.host",self.contact_points_str)\ .save() ## use : DAO(10) # def __call__(self, radius): # if radius <= 0: # print('必須是正數') # else: # self.radius = radius使用class示範
CASSANDRA_ENV = CassandraType.TEST c_dao = CassandraDAO(CASSANDRA_ENV) ## 執行CQL result = c_dao.execCQL('mykeyspace', 'select * from table;') ## DB讀出成pandas dataframe pandas_df = c_dao.execCQLSelectToPandasDF('mykeyspace', cql) ## pyspark dataframe 寫入DB c_dao.saveToCassandraDF(spark_df, 'mykeyspace', 'table_name','overwrite')若遇到pandas dataframe必須insert多筆資料, 會有效能問題, 可以使用prepared語法
from cassandra.cluster import Cluster cluster = Cluster( contact_points='192.168.0.90,192.168.0.91,192.168.0.92'.split(',') ) session = cluster.connect() session.default_fetch_size = 10000000 session = cluster.connect('helper_dev_keyspace') query = ("INSERT INTO intent_algorithm2(algorithm_id,algorithm_name) VALUES (?,?)") prepared = session.prepare(query) alist = [str(x) for x in range(10000)] test_pd = pd.DataFrame({'algorithm_id':alist, 'algorithm_name':alist}) api_start = datetime.now() # for index, row in test_pd.iterrows(): # session.execute_async(prepared, # (row.algorithm_id,row.algorithm_name)) api_end = datetime.now() api_duration = str(api_end - api_start) api_duration
部屬Spark與Cassandra
Cassandra會根據印記範圍將資料放置於各節點, 這些分散的資料很適合做為Spark的資料源, 因為每個節點只擁有部分的資料, 建議在資料中心內每個Cassandra節點上安裝Spark Worker
補充
- 部屬時建議為分析任務獨立建立一座資料中心, 或將蒐集資料與分析的資料切開, 避免執行分析任務負載衝擊從及其他節點
- Cassandra特別適合蒐集大量感測器的資料, 但CQL分析起來相當困難, 搭配Spark進行資料處理試較為洽當的方法, 但不該將Spark-Cassandra取代Hadoop, 不適合做為資料倉儲使用
沒有留言:
張貼留言