本篇介紹Kafka基本觀念與指令
kafka特色:
- 資料可以保存下來, 消費端可以慢慢處理
- 擴張性
- 各單元解耦
解決的問題 :
使用場景 :
- 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等
- 訊息系統:解耦和生產者和消費者、快取訊息等
- 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、資料倉庫中做離線分析和挖掘,頁面檢視、搜尋、使用者行為分析等,這些實際上是Kafka在LinkedIn設計的原始初衷。使用者點選網站活動內容,每個活動型別均有一個Topic,可以實時的反饋,以便深入瞭解使用者參與度、下載量、頁面流量等。
- 運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告
- 流式處理:比如spark streaming和storm
- 旅遊行業:例如,在一個旅遊網站,酒店和航班的價格是一直在變化的,系統的一些元件(價格告警、分析等)需要了解這些變化。你在Kafka的Topic上釋出更改,並且需要通知的每個元件都充當一個消費者。每個消費者應用所組成的節點形成一個消費者組。給消費者組所消費的Topic的傳送訊息動態記錄,這樣每個消費者均可獲取訊息記錄,同時每個消費者內能夠有效的劃分工作內容。
- GPS:例如,能夠實時獲取智慧手機裝置的位置資料,並且希望能夠實時處理這些資料來顯示車輛路徑、行駛距離等。傳入資料到Kafka的Topic中,並使用Stream API來進行處理。當需要在特定時間段內提取和處理給定使用者的所有位置資料時,使用視窗進行狀態處理會有不錯的效果。
腦海中要先有一個架構圖才好理解, ref列在最下方
架構圖
資料送出與保存
生產者會取得有多少分區等資訊, 送到自己的暫存分區再送過去, 省略zookeeper增加效率
資料取得
透過zookeeper協調, 透過group可以提高消費效率, 一個partition對應一個consumer效率最好, 如果增加consumer, 再次跟zookeeper取得多少分區重新分配, 一個consumer可以消費多個分區, 但多個consumer不可以消費一個分區, 會拉到重複的
元件介紹
- Producer : 生產資料
- Consumer : 接收資料, 兩種模式1.一對一: 向topic拉取拉完就刪除, 2.發布/訂閱 : 一對多, 數據生產後, 主動推送給訂閱者, 資料保留一段時間後刪掉, 如果重複資料可以設定不要重複拉, 可以照自己消費能力控制速度, 非kafka推送過來
- Consumer group : 同時接收資料, 增加consumer 和 partition有助於加速接收, 通常與partition數量相同
- Broker : 一台server可以看作一個broker
- Topic : 主題, 可針對主題訂閱接收特定資料, 非常大的topic可以放到多台broker
- Partition : message 分別存在多個partition, 增加吞吐量, 單一partition有先進先出的順序, 保證數據排序正確, 多個則不保證, 有順序資料可放同一個partition
- Replication : 將partition資料備份, 可設定數量, 沒備份的話broker壞了就全壞了, 主要使用資料節點為leader, 備份資料為follower, follower會輪詢去leader取資料來備份
- Offset : 紀錄存在partition中的資料被讀取到哪的地方
- Zookeeper : consumer會透過zookeeper協調到正確的地方取資料, producer 不會用到, metadata都存在這裡
- 生產者應答機制ack
- ack=0不管是不是送到kafka, producer送下一條數據
- ack=1 leader有答應(default), 才送下一條數據
- ack=-1 等待所有leader follower答應才送下一條但最慢
基本指令
topic
# 列出topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --list
# 建立topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --create --topic my-first-topic \
--partitions 1 --replication-factor 2
# 查詢topic資訊 2181是zookeeper port
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --describe --topic my-first-topic
(ISR為正在同步的副本)
# 刪除topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --delete --topic my-first-topic
生產訊息, 9092是kafka port
bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic my-first-topic
接收訊息
bin/kafka-console-consumer.sh --zookeeper 192.168.0.1:2181 --from-beginning --topic my-first-topic
# 讀取consumer.properties, 可設定group.id在config裡面
bin/kafka-console-consumer.sh --zookeeper 192.168.0.1:2181 --from-beginning --topic my-first-topic\
--consumer.config config/consumer.properties
# 確認資料存放offset
bin/kafka-consumer-offset-checker.sh --zookeeper 192.168.0.1:2181 --group console-consumer 12345
配置 server.properties
#設定broker.id
broker.id=0
#允許將topic刪除, 而非只上flag
delete.topic.enable=True
#log存放位置
log.dirs=/xxx/xxx/xxx/
#zookeeper連結ip
zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3
#同步另外幾台
xsync kafka
#port
預設9092
#訊息最大量
message.max.bytes
#預設partition數量
num.partition = 2
#資料儲存策略(無論是否被消費) 超過就刪除
#時間
log.retention.hours=168
#大小
log.retention.bytes=173....(1G)
#備份
default.refication.factor >= 2
配置
producer.properties
# Ack方式
request.required.acks = 1
配置
consumer.properties
# consumer group
多台設定同一個group.id
#zookeeper連結
zookeeper.connect
理解原理後, 搭配自己專案的程式client 就可以開始發展各種應用囉~補充:
攔截器interceptor
producer interceptor可在發送訊息時加入一些邏輯, 客製化訊息內容, 可實作下列方法, EX把時間也記錄 or 計算成功失敗次數
- configer(獲取配置資訊)
- onSend(訊息要送出時觸發)
- onAckownledgement(消息被應答或失敗時觸發)
- close(關閉interceptor)
與Flume比較
- Flume採集數據, Kafka有保存功能
- 數據可能會丟失, in memory的方式, Kafka有副本較安全
- Flume適合Hadoop生態圈
- Kafka適合下游消費者眾多的狀況, 增加consumer很容易
- Flume >> Kafka >> Hbase 可以照自己需求去串聯
相關API
與Spark串接
Ref:
沒有留言:
張貼留言