2018年3月10日 星期六

[Python][Cassandra][Spark] pyspark與Cassandra整合

 
python , pyspark與Cassandra資料交換需要透過特別的套件, 一個是python-cassnadra-driver, 一個是pyspark-cassandra-connector, 這裡介紹使用方法和Spark-Cassandra部屬策略
python與不同DB要進行資料交換時有不同的實作方式, 這篇是以Cassandra DB當成範例, 分成兩部分, 一個是python與pandas dataframe直接存取, 一個是pyspark dataframe存取, 存取的方式與套件有很多種, 本篇只提供自己實作過可行的範例

首先必須先安裝套件
a. python-cassandra driver
b. pyspark-cassandra connector

因為我自己也忘了當時怎麼安裝這兩個套件的, 所以安裝步驟先跳過, 之後再補上
安裝完後就可以使用, 要記得pyspark submit的時候要把pyspark-cassandra connector帶入

以下使用jupytor notebook開啟為範例, 並且帶入connector
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" PYTHONIOENCODING="utf8" MASTER=spark://xxx.xxx.xxx.xxx:7077 pyspark --total-executor-cores 2 --executor-memory 2g --jars /usr/local/spark_jars/sqljdbc42.jar,/usr/local/spark_jars/spark-cassandra-connector-2.0.0-M2-s_2.11.jar
建立class示範
# encoding=UTF-8
#!flask/bin/python

from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
import pandas as pd
from pyspark.sql.types import StructType

class CassandraType(object):
    PRODUCTION = 0
    TEST = 1


class CassandraDAO(object):
    
    # you have to install following items :
    # a. python-Cassandra driver
    # b. pyspark cassandra connector

    def __init__(self, type):
#         print('runing father.__init__')
        if type == CassandraType.PRODUCTION:
            self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx']
            self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx"
        else:
            self.contact_points=['xx.xx.xx.xx','xx.xx.xx.xx']
            self.contact_points_str = "xx.xx.xx.xx,xx.xx.xx.xx"         

        self.formatString = "org.apache.spark.sql.cassandra"
        self.username = "username"
        self.password = "password"
        self.cluster = None
        self.session = None
        self.createSession()
        
    def __del__(self):
        self.cluster.shutdown()      
        
    def pandas_factory(self, colnames, rows):
        return pd.DataFrame(rows, columns=colnames)  
    
    def createSession(self):
        print "contact_points = " + self.contact_points_str
        self.cluster = Cluster(
            contact_points=self.contact_points, #random select a node
#             load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1'), 
    #         auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        )
        self.session = self.cluster.connect()
        self.session.row_factory = self.pandas_factory
        self.session.default_fetch_size = 10000000 #needed for large queries, otherwise driver will do pagination. Default is 50000.    

    def getSession(self):
        return self.session
    
    def execCQL(self, keyspace, cql):
        """
        execute CQL
        """  
        self.session.set_keyspace(keyspace)
        self.session.execute_async(cql) 
        
    def execCQLSelect(self, keyspace, cql):
        """
        execute CQL, select only
        """  
          
        self.session.set_keyspace(keyspace)
        
#       cassandra ResultSet
        async_results = self.session.execute_async(cql)            
        return async_results
    
    def execCQLCallBackAnysc(self, keyspace, cql, handle_success, handle_error):
        """
        execute CQL, if success => handle_success function, else handle_error
        """  
        self.session.set_keyspace(keyspace)
        async_results = self.session.execute_async(cql)
        async_results.add_callbacks(handle_success, handle_error)
    
    def execCQLSelectToPandasDF(self, keyspace, cql):
        """
        execute CQL, select only, return Pandas DataFrame
        """  
          
        self.session.set_keyspace(keyspace)
        
#       cassandra ResultSet
        async_results = self.session.execute_async(cql)

#       to Pandas DataFrame            
        return async_results.result()._current_rows
        
    def execCQLSelectToDF(self, sqlContext, keyspace, cql): 
        """
        execute CQL, select only, return Spark DataFrame
        """           

#       pandas dataframe to spark dataframe
        pandas_dataframe = self.execCQLSelectToPandasDF(keyspace, cql)
        if pandas_dataframe.empty:
            schema = StructType([])
            return sqlContext.createDataFrame([],schema)
        else:    
            return sqlContext.createDataFrame(pandas_dataframe)
    
    def execCQLSelectToRDD(self, sqlContext, keyspace, cql):
        """
        execute CQL, select only, return Spark RDD
        """  
          
        return self.execCQLSelectToDF(sqlContext, keyspace, cql).rdd.map(tuple)#dataFrame to RDD  
    
    @property
    def contactPoints(self):
        return self.contact_points     
    
    @contactPoints.setter
    def contactPoints(self, contact_points):
        self.contact_points = contact_points
    
    @contactPoints.deleter
    def contactPoints(self): 
        del self.contact_points                 
    
# pyspark cassandra connector
    def readFromCassandraDF(self, sqlContext, keyspace, table):
        """
        read data from Cassandra, return Dataframe
        """   
              
        return sqlContext.read\
                        .format(self.formatString)\
                        .options(table=table, keyspace=keyspace)\
                        .option("spark.cassandra.connection.host",self.contact_points_str)\
                        .load()
                        
    def readFromCassandraRDD(self, sqlContext, keyspace, table): 
        """
        read data from Cassandra, return RDD
        """  
          
        df =  sqlContext.read\
                        .format(self.formatString)\
                        .options(table=table, keyspace=keyspace)\
                        .option("spark.cassandra.connection.host",self.contact_points_str)\
                        .load()  
        return df.rdd.map(tuple)#dataFrame to RDD 
    
    def saveToCassandraDF(self, dataFrame, keyspace, table, mode="error"):
        """
        Save data to Cassandra using DataFrame, select one mode to save
        
        SaveMode.ErrorIfExists (default) | "error"      When saving a DataFrame to a data source, 
                                                        if data already exists, an exception is expected to be thrown.
        SaveMode.Append                  | "append"     When saving a DataFrame to a data source, 
                                                        if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
        SaveMode.Overwrite               | "overwrite"  Overwrite mode means that when saving a DataFrame to a data source, 
                                                        if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
        SaveMode.Ignore                  | "ignore"     Ignore mode means that when saving a DataFrame to a data source, 
                                                        if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
        """
        
        dataFrame.write\
                .format(self.formatString)\
                .mode(mode)\
                .options(table=table, keyspace=keyspace)\
                .option("spark.cassandra.connection.host",self.contact_points_str)\
                .save()  
               
##     use : DAO(10)           
#     def __call__(self, radius):
#         if radius <= 0:
#             print('必須是正數')
#         else:
#             self.radius = radius

        
使用class示範
CASSANDRA_ENV = CassandraType.TEST
c_dao = CassandraDAO(CASSANDRA_ENV)

## 執行CQL
result = c_dao.execCQL('mykeyspace', 'select * from table;')

## DB讀出成pandas dataframe
pandas_df = c_dao.execCQLSelectToPandasDF('mykeyspace', cql)

## pyspark dataframe 寫入DB
c_dao.saveToCassandraDF(spark_df, 'mykeyspace', 'table_name','overwrite')
若遇到pandas dataframe必須insert多筆資料, 會有效能問題, 可以使用prepared語法
from cassandra.cluster import Cluster
cluster = Cluster(
    contact_points='192.168.0.90,192.168.0.91,192.168.0.92'.split(',')
)
session = cluster.connect()
session.default_fetch_size = 10000000 
session = cluster.connect('helper_dev_keyspace')
query = ("INSERT INTO intent_algorithm2(algorithm_id,algorithm_name) VALUES (?,?)")
prepared = session.prepare(query)

alist = [str(x) for x in range(10000)]
test_pd = pd.DataFrame({'algorithm_id':alist,
                        'algorithm_name':alist})
api_start = datetime.now()
# for index, row in test_pd.iterrows():
#     session.execute_async(prepared, 
#                          (row.algorithm_id,row.algorithm_name))
api_end = datetime.now()
api_duration = str(api_end - api_start)
api_duration

部屬Spark與Cassandra
Cassandra會根據印記範圍將資料放置於各節點, 這些分散的資料很適合做為Spark的資料源, 因為每個節點只擁有部分的資料, 建議在資料中心內每個Cassandra節點上安裝Spark Worker

補充
  • 部屬時建議為分析任務獨立建立一座資料中心, 或將蒐集資料與分析的資料切開, 避免執行分析任務負載衝擊從及其他節點
  • Cassandra特別適合蒐集大量感測器的資料, 但CQL分析起來相當困難, 搭配Spark進行資料處理試較為洽當的方法, 但不該將Spark-Cassandra取代Hadoop, 不適合做為資料倉儲使用

沒有留言:

張貼留言