2021年7月26日 星期一

RabbitMQ(一)

  


RabbitMQ(一)

RabbitMQ是流行的消息隊列服務軟件,此篇內容包含基本觀念, 持久化, 發佈確認, 消息應答, 程式基本寫法等
RabbitMQ導入

  • 流量消峰 : 當有大量呼叫server服務時, 可能導致服務crash, 導入排隊機制後, 有效控制呼叫量, 但缺點就是必須排隊
  • 應用解耦 : 將不同功能程式切開, 不至於一個壞全部壞, 在包裝成docker服務時, 也可以將安裝套件給切開, 舉例來說,  通常機器學習因為演算法不同而套件也不相同, 降低docker包裝的大小與複雜度也是一項重要任務
  • 異步處理 : 當服務必須執行一段時間, 可將服務任務交給queue排隊處理, 當執行完成後, 再通知呼叫方, 舉例來說, 通常訓練模型任務必須一段時間的等待, 交給queue等候處理是一個好的應用
  • 與Queue比較 : Kafka適合大數據, 有log蒐集的需求; RocketMQ適用於金融領域; RabbitMQ適用於中小型公司, 介面清楚, 數據量沒這麼大的場景
  • 以下*是建議要設定的功能

架構

  • Exchange 可以對應多個queue
  • connection開銷較大, 建立一次即可, 之後傳輸使用channel

簡單工作Queue
連線範例程式(java), 不同語言client端觀念都差不多

python 可以使用pika

生成對列的相關參數說明

發出消息相關參數說明
    
已發送至消息對列

   接收消息範例程式

工作消息Queue

  • 多個消費者(工作線程),  一個工作只會被執行一次, 輪流接收工作來處理
  • *輪流分發工作, 可以修改成不公平分發, 在較強悍的機器上分發更多的工作, 強的機器不會長期處於待命狀態, 在consumer上設定, 0是輪流分發, 1是不公平分發, 處理較快的consumer會收到較多工作
  • 通道預取設置(Channel Prefetch Setting (QoS)): 只能在消息手動確認模式中啟作用。為了避免消費者端一次同時處理過多的消息,可以通過basic.qos設置最大的預取值。該值定義了通道上允許的最大未確認消息,一旦未確認消息的數量達到配置值,RabbitMQ將停止在通道上傳送更多消息,直到至少有一個未被確認的消息被確認。RabbitMQ官方推薦Qos預取值設置在100到300範圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題
  • prefetch_count = 2以上, 該channel最多收到2個待處理, 處理完就會塞入



消息應答
  • 分為自動應答與手動應答, 自動應答於consumer接收到就應答, 任務尚未處理完, 不建議使用
  • *手動應答, 以下是手動應答方法, basicNack多一個批量應答, 減少網路堵塞, 但不建議開啟(mutiple=false), 除非允許部分消息丟失 



  • 重新入列requeue=False : 當發生通道關閉, 連結關閉, 未收到ack, 則重新排隊到另一個consumer處理, 會一直試其它的監聽程式, 直到成功
  • consume設定autoAck=false, 並且撰寫callback function
  • Delivery Tags : 當消費者或者訂閱者被註冊時,消息將由RabbitMQ的basic.deliver方法傳遞(推送),該方法會攜帶一個channel唯一標識的deliberyTag,確認交付的客戶端庫方法將交付標籤作為參數。由於DiliveryTag的範圍為Channel,所以必須在同一channel確定,在不同的信道確認會導致unknown delivery tag錯誤


MQ持久化
*對列持久化
  • RabbitMQ如果當機, queue中的資料會消失
  • queue_declare(queue=queue_id, durable=True), 但已經創建過的queue必須砍掉重來, 介面中可以刪除對列, 重新publish

*消息持久化
  • 生產者設定將消息持久化, 寫入disk中, 如果不寫是存於memory當中

  • 但也有可能寫到disk中壞掉, 必須在加入發佈確認機制
  • 發佈確認 : 保證消息不丟失, 分為以下三種
  1. 單個發佈確認 : 發一個就要確認一次, 速度慢
  2. 批次發佈確認 : 速度較快, 但有個消息壞掉不知道是哪個
  3. 異步批次發佈確認 : 效果最好,不斷發送訊息, 透過callback確認是否成功, 有問題或沒問題的會異步確認後再通知
基本範例程式

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()


channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="hello world",
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )

print("send message!")

connection.close()
# 消費者程式碼

import pika
import time

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 消費者再次宣告queue
channel.queue_declare(queue='hello', durable=True)


# 回撥函式
def callback(ch, method, properties, body):
    print(body)
    print(method.delivery_tag)
    # time.sleep(10)
    #  delivery_tag 是在 channel 中的一個訊息計數, 每次訊息提取行為都對應一個數字
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("任務執行完成!")
    """
    消費者在接收到訊息之後,還可以拒絕訊息,我們只需要呼叫basic_reject就可以,如下:
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    """
    # 如果不加的話,消費者會hang住,監聽到有值就呼叫callback,沒值就等待監聽訊息
    # channel.close()


# 從hello的queue中拿資料,然後呼叫回撥函式
channel.basic_consume(queue="hello",
                      on_message_callback=callback,
                      auto_ack=False
                      )
# 不公平分發, 誰處理快就都給誰處理
channel.basic_qos(prefetch_count=1)
print("waiting for messages.....") channel.start_consuming()

官方範例程式
裡面包含更多參數設定, 會在下一篇提到



Ref:

沒有留言:

張貼留言