本範例使用python hdfs套件來串聯HDFS, 可對HDFS做一些檔案的檢查, 之後再使用spark將訓練完的model寫入HDFS |
首先執行 pip install hdfs
documentation : https://pypi.python.org/pypi/hdfs/
使用hdfs套件,來串聯hdfs與python程式
以下爲範例
from hdfs import InsecureClient class HdfsClient(object): """ hdfs 2.1.0 $ pip install hdfs documentation : https://pypi.python.org/pypi/hdfs/ """ def __init__(self, _host='0.0.0.0', _port='50070', _user='cipuser'): self.client = InsecureClient('http://'+ _host +':' + _port + '/', user = _user) filePath = "" #region Exploring the file system def selectFile(self, path): """ select file and return a list :param path: path in hdfs file """ return self.client.list(self.filePath + path) def getSummaryFile(self, path): """ Retrieving a file or folder content summary. :param path: path in hdfs file """ return self.client.content(self.filePath + path) def getStatusFile(self, path): """ Retrieving a file or folder status. :param path: path in hdfs file """ return self.client.status(self.filePath + path) def moveFile(self, path, dir): """ move file to dir :param path: path from hdfs file :param dir: path to hdfs file """ self.client.rename(self.filePath + path, self.filePath + dir) def deleteFile(self, path, recursive=True): """ delete file :param path: path at hdfs """ self.client.delete(self.filePath + path, recursive) #endregion
就可以這樣來使用
## model save to HDFS print '------ model save to HDFS ------' hdfsCli = HdfsClient(_host=HDFS_IP, _user=HDFS_USER) hdfsCli.filePath = HDFS_MODEL_PATH print hdfsCli.filePath now = datetime.now().strftime(DATE_FORMAT_Y_M_D_H_M_S) if hdfsCli.getSummaryFile("")['fileCount'] > 0: if hdfsCli.getSummaryFile("recommend_model")['fileCount'] > 0: hdfsCli.moveFile("recommend_model" , "recommend_model_" + now ) print 'hdfs_path = '+ HDFS_HOST + HDFS_ROOT_PATH + HDFS_MODEL_PATH + MODEL_NAME spark_model.write().overwrite().save(HDFS_HOST + HDFS_ROOT_PATH + HDFS_MODEL_PATH + MODEL_NAME);
沒有留言:
張貼留言