2020年4月25日 星期六

[Kakfa] 基本觀念

本篇介紹Kafka基本觀念與指令

kafka特色:
  • 資料可以保存下來, 消費端可以慢慢處理
  • 擴張性
  • 各單元解耦
解決的問題 :


使用場景 :

  • 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等
  • 訊息系統:解耦和生產者和消費者、快取訊息等
  • 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、資料倉庫中做離線分析和挖掘頁面檢視、搜尋、使用者行為分析等,這些實際上是Kafka在LinkedIn設計的原始初衷。使用者點選網站活動內容,每個活動型別均有一個Topic,可以實時的反饋,以便深入瞭解使用者參與度、下載量、頁面流量等。
  • 運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告
  • 流式處理:比如spark streaming和storm
  • 旅遊行業:例如,在一個旅遊網站,酒店和航班的價格是一直在變化的,系統的一些元件(價格告警、分析等)需要了解這些變化。你在Kafka的Topic上釋出更改,並且需要通知的每個元件都充當一個消費者。每個消費者應用所組成的節點形成一個消費者組。給消費者組所消費的Topic的傳送訊息動態記錄,這樣每個消費者均可獲取訊息記錄,同時每個消費者內能夠有效的劃分工作內容。
  • GPS:例如,能夠實時獲取智慧手機裝置的位置資料,並且希望能夠實時處理這些資料來顯示車輛路徑、行駛距離等。傳入資料到Kafka的Topic中,並使用Stream API來進行處理。當需要在特定時間段內提取和處理給定使用者的所有位置資料時,使用視窗進行狀態處理會有不錯的效果。

腦海中要先有一個架構圖才好理解, ref列在最下方

架構圖

資料送出與保存
生產者會取得有多少分區等資訊, 送到自己的暫存分區再送過去, 省略zookeeper增加效率

資料取得
透過zookeeper協調, 透過group可以提高消費效率, 一個partition對應一個consumer效率最好, 如果增加consumer, 再次跟zookeeper取得多少分區重新分配, 一個consumer可以消費多個分區, 但多個consumer不可以消費一個分區, 會拉到重複的


元件介紹
  • Producer : 生產資料
  • Consumer : 接收資料, 兩種模式1.一對一: 向topic拉取拉完就刪除, 2.發布/訂閱 : 一對多, 數據生產後, 主動推送給訂閱者, 資料保留一段時間後刪掉, 如果重複資料可以設定不要重複拉, 可以照自己消費能力控制速度, 非kafka推送過來
  • Consumer group : 同時接收資料, 增加consumer 和 partition有助於加速接收, 通常與partition數量相同
  • Broker : 一台server可以看作一個broker
  • Topic : 主題, 可針對主題訂閱接收特定資料, 非常大的topic可以放到多台broker
  • Partition : message 分別存在多個partition, 增加吞吐量, 單一partition有先進先出的順序, 保證數據排序正確, 多個則不保證, 有順序資料可放同一個partition
  • Replication : 將partition資料備份, 可設定數量, 沒備份的話broker壞了就全壞了, 主要使用資料節點為leader, 備份資料為follower, follower會輪詢去leader取資料來備份
  • Offset : 紀錄存在partition中的資料被讀取到哪的地方
  • Zookeeper : consumer會透過zookeeper協調到正確的地方取資料, producer 不會用到, metadata都存在這裡


  • 生產者應答機制ack
  1. ack=0不管是不是送到kafka,  producer送下一條數據 
  2. ack=1 leader有答應(default), 才送下一條數據 
  3. ack=-1 等待所有leader follower答應才送下一條但最慢


基本指令

topic

# 列出topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --list

# 建立topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --create --topic my-first-topic \
--partitions 1 --replication-factor 2

# 查詢topic資訊 2181是zookeeper port
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --describe --topic my-first-topic
(ISR為正在同步的副本)

# 刪除topic
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --delete --topic my-first-topic
生產訊息, 9092是kafka port
 
bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic my-first-topic
接收訊息
 
bin/kafka-console-consumer.sh --zookeeper 192.168.0.1:2181 --from-beginning --topic my-first-topic

# 讀取consumer.properties, 可設定group.id在config裡面
bin/kafka-console-consumer.sh --zookeeper 192.168.0.1:2181 --from-beginning --topic my-first-topic\
--consumer.config config/consumer.properties

# 確認資料存放offset
bin/kafka-consumer-offset-checker.sh --zookeeper 192.168.0.1:2181 --group console-consumer 12345


配置 server.properties
 
#設定broker.id
broker.id=0

#允許將topic刪除, 而非只上flag
delete.topic.enable=True

#log存放位置
log.dirs=/xxx/xxx/xxx/

#zookeeper連結ip
zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3

#同步另外幾台
xsync kafka
#port
預設9092

#訊息最大量
message.max.bytes

#預設partition數量
num.partition = 2

#資料儲存策略(無論是否被消費) 超過就刪除
#時間
log.retention.hours=168
#大小
log.retention.bytes=173....(1G)

#備份
default.refication.factor >= 2

配置 producer.properties

# Ack方式
request.required.acks = 1

配置 consumer.properties

# consumer group
多台設定同一個group.id

#zookeeper連結
zookeeper.connect


理解原理後, 搭配自己專案的程式client 就可以開始發展各種應用囉~

補充:
攔截器interceptor
producer interceptor可在發送訊息時加入一些邏輯, 客製化訊息內容, 可實作下列方法, EX把時間也記錄 or 計算成功失敗次數
  • configer(獲取配置資訊)
  • onSend(訊息要送出時觸發)
  • onAckownledgement(消息被應答或失敗時觸發)
  • close(關閉interceptor)

與Flume比較

  • Flume採集數據, Kafka有保存功能
  • 數據可能會丟失, in memory的方式, Kafka有副本較安全
  • Flume適合Hadoop生態圈
  • Kafka適合下游消費者眾多的狀況, 增加consumer很容易
  • Flume >> Kafka >> Hbase 可以照自己需求去串聯
相關API

與Spark串接


Ref: 

04 Kafka 介绍 & 安装
Kafka應用實踐與生態整合     

Kafka架構、Kafka核心元件、Kafka工作原理、Kafka應用


沒有留言:

張貼留言