RabbitMQ(一)
RabbitMQ是流行的消息隊列服務軟件,此篇內容包含基本觀念, 持久化, 發佈確認, 消息應答, 程式基本寫法等
RabbitMQ導入
- 流量消峰 : 當有大量呼叫server服務時, 可能導致服務crash, 導入排隊機制後, 有效控制呼叫量, 但缺點就是必須排隊
- 應用解耦 : 將不同功能程式切開, 不至於一個壞全部壞, 在包裝成docker服務時, 也可以將安裝套件給切開, 舉例來說, 通常機器學習因為演算法不同而套件也不相同, 降低docker包裝的大小與複雜度也是一項重要任務
- 異步處理 : 當服務必須執行一段時間, 可將服務任務交給queue排隊處理, 當執行完成後, 再通知呼叫方, 舉例來說, 通常訓練模型任務必須一段時間的等待, 交給queue等候處理是一個好的應用
- 與Queue比較 : Kafka適合大數據, 有log蒐集的需求; RocketMQ適用於金融領域; RabbitMQ適用於中小型公司, 介面清楚, 數據量沒這麼大的場景
- 以下*是建議要設定的功能
架構
- Exchange 可以對應多個queue
- connection開銷較大, 建立一次即可, 之後傳輸使用channel
- 多個消費者(工作線程), 一個工作只會被執行一次, 輪流接收工作來處理
- *輪流分發工作, 可以修改成不公平分發, 在較強悍的機器上分發更多的工作, 強的機器不會長期處於待命狀態, 在consumer上設定, 0是輪流分發, 1是不公平分發, 處理較快的consumer會收到較多工作
- 通道預取設置(Channel Prefetch Setting (QoS)): 只能在消息手動確認模式中啟作用。為了避免消費者端一次同時處理過多的消息,可以通過basic.qos設置最大的預取值。該值定義了通道上允許的最大未確認消息,一旦未確認消息的數量達到配置值,RabbitMQ將停止在通道上傳送更多消息,直到至少有一個未被確認的消息被確認。RabbitMQ官方推薦Qos預取值設置在100到300範圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題
- prefetch_count = 2以上, 該channel最多收到2個待處理, 處理完就會塞入
消息應答
- 分為自動應答與手動應答, 自動應答於consumer接收到就應答, 任務尚未處理完, 不建議使用
- *手動應答, 以下是手動應答方法, basicNack多一個批量應答, 減少網路堵塞, 但不建議開啟(mutiple=false), 除非允許部分消息丟失
- 重新入列requeue=False : 當發生通道關閉, 連結關閉, 未收到ack, 則重新排隊到另一個consumer處理, 會一直試其它的監聽程式, 直到成功
- consume設定autoAck=false, 並且撰寫callback function
- Delivery Tags : 當消費者或者訂閱者被註冊時,消息將由RabbitMQ的basic.deliver方法傳遞(推送),該方法會攜帶一個channel唯一標識的deliberyTag,確認交付的客戶端庫方法將交付標籤作為參數。由於DiliveryTag的範圍為Channel,所以必須在同一channel確定,在不同的信道確認會導致unknown delivery tag錯誤
MQ持久化
*對列持久化
- RabbitMQ如果當機, queue中的資料會消失
- queue_declare(queue=queue_id, durable=True), 但已經創建過的queue必須砍掉重來, 介面中可以刪除對列, 重新publish
*消息持久化
- 單個發佈確認 : 發一個就要確認一次, 速度慢
- 批次發佈確認 : 速度較快, 但有個消息壞掉不知道是哪個
- 異步批次發佈確認 : 效果最好,不斷發送訊息, 透過callback確認是否成功, 有問題或沒問題的會異步確認後再通知
基本範例程式
import pika # 使用者名稱和密碼 credentials = pika.PlainCredentials('qpm', 'cljslrl0620') # 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost) connection = pika.BlockingConnection(pika.ConnectionParameters( "123.56.162.92", credentials=credentials, virtual_host="qpm", )) # 在socket通道之上建立了rabbit協議的通道 channel = connection.channel() channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange="", routing_key="hello", body="hello world", properties=pika.BasicProperties( delivery_mode=2, # 訊息的持久化 ) ) print("send message!") connection.close()
# 消費者程式碼 import pika import time # 使用者名稱和密碼 credentials = pika.PlainCredentials('qpm', 'cljslrl0620') # 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost) connection = pika.BlockingConnection(pika.ConnectionParameters( "123.56.162.92", credentials=credentials, virtual_host="qpm", )) # 在socket通道之上建立了rabbit協議的通道 channel = connection.channel() # 消費者再次宣告queue channel.queue_declare(queue='hello', durable=True) # 回撥函式 def callback(ch, method, properties, body): print(body) print(method.delivery_tag) # time.sleep(10) # delivery_tag 是在 channel 中的一個訊息計數, 每次訊息提取行為都對應一個數字 ch.basic_ack(delivery_tag=method.delivery_tag) print("任務執行完成!") """ 消費者在接收到訊息之後,還可以拒絕訊息,我們只需要呼叫basic_reject就可以,如下: ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) """ # 如果不加的話,消費者會hang住,監聽到有值就呼叫callback,沒值就等待監聽訊息 # channel.close() # 從hello的queue中拿資料,然後呼叫回撥函式 channel.basic_consume(queue="hello", on_message_callback=callback, auto_ack=False )
print("waiting for messages.....") channel.start_consuming()# 不公平分發, 誰處理快就都給誰處理 channel.basic_qos(prefetch_count=1)
官方範例程式
裡面包含更多參數設定, 會在下一篇提到
Ref:
沒有留言:
張貼留言