![]() |
本範例使用python hdfs套件來串聯HDFS, 可對HDFS做一些檔案的檢查, 之後再使用spark將訓練完的model寫入HDFS |
首先執行 pip install hdfs
documentation : https://pypi.python.org/pypi/hdfs/
使用hdfs套件,來串聯hdfs與python程式
以下爲範例
from hdfs import InsecureClient
class HdfsClient(object):
"""
hdfs 2.1.0
$ pip install hdfs
documentation : https://pypi.python.org/pypi/hdfs/
"""
def __init__(self, _host='0.0.0.0', _port='50070', _user='cipuser'):
self.client = InsecureClient('http://'+ _host +':' + _port + '/', user = _user)
filePath = ""
#region Exploring the file system
def selectFile(self, path):
"""
select file and return a list
:param path: path in hdfs file
"""
return self.client.list(self.filePath + path)
def getSummaryFile(self, path):
"""
Retrieving a file or folder content summary.
:param path: path in hdfs file
"""
return self.client.content(self.filePath + path)
def getStatusFile(self, path):
"""
Retrieving a file or folder status.
:param path: path in hdfs file
"""
return self.client.status(self.filePath + path)
def moveFile(self, path, dir):
"""
move file to dir
:param path: path from hdfs file
:param dir: path to hdfs file
"""
self.client.rename(self.filePath + path, self.filePath + dir)
def deleteFile(self, path, recursive=True):
"""
delete file
:param path: path at hdfs
"""
self.client.delete(self.filePath + path, recursive)
#endregion
就可以這樣來使用
## model save to HDFS
print '------ model save to HDFS ------'
hdfsCli = HdfsClient(_host=HDFS_IP, _user=HDFS_USER)
hdfsCli.filePath = HDFS_MODEL_PATH
print hdfsCli.filePath
now = datetime.now().strftime(DATE_FORMAT_Y_M_D_H_M_S)
if hdfsCli.getSummaryFile("")['fileCount'] > 0:
if hdfsCli.getSummaryFile("recommend_model")['fileCount'] > 0:
hdfsCli.moveFile("recommend_model" , "recommend_model_" + now )
print 'hdfs_path = '+ HDFS_HOST + HDFS_ROOT_PATH + HDFS_MODEL_PATH + MODEL_NAME
spark_model.write().overwrite().save(HDFS_HOST + HDFS_ROOT_PATH + HDFS_MODEL_PATH + MODEL_NAME);

沒有留言:
張貼留言