RabbitMQ(二)
RabbitMQ是流行的消息隊列服務軟件, 此篇內容包含Exchange交換機, 死信對列, 延遲對列
Exchange交換機
- 目的 : 生產者發消息, 讓多個consumer收到
- 一個對列下, consumer彼此是競爭關係, 只有其中一台consumer收的到消息, 所以必須透過交換機傳送給兩個對列, 各自consumer再去消費, 此模式稱為發佈訂閱模式
- binding : 交換機與對列之間的綁定, 根據routingKey送到對列, 再送到consumer
Exchange在定義的時候是有型別的,以決定到底哪些queue符號條件,可以接收訊息
- fanout:所有bind到此exchange的queue都可以接收訊息——廣播
- direct:通過routingKey和exchange決定那一個組的queue可以接收訊息——組播
- topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息——根據特徵自定
以下是廣播的範例程式, 其他兩種請參考這裡
# 生產者程式碼 # 在socket通道之上建立了rabbit協議的通道 channel = connection.channel() # 宣告一個exchange channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True) # 傳送配置為exchange channel.basic_publish(exchange="logs", routing_key="", body="exchange message", properties=pika.BasicProperties( delivery_mode=2, # 訊息的持久化 ) )
# 消費者程式碼
# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)
# 消費者再次宣告queue
result = channel.queue_declare('', exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print("waiting for logs......")
def callback(ch, method, properties, body):
print(body)
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
channel.start_consuming()
死信對列
有幾種狀況會到死信對列, 再由死信對列進行處理
- 消息被拒絕, 並且不放回普通對列
- 消息TTL到期
- 對列達最大長度
- 程式碼範例-RabbitMqClient(pika)
延遲對列
與死信對列觀念類似, 在生產者中設定過期時間, 向延遲對列發消息, 時間過後進行處理相關事件, 以下是應用場景
- 火車票訂單購買後產生後送入對列, 在10分鐘之內未支付則自動取消, 10分鐘後就會進行延遲對列的處理, 檢查DB中是否付錢, 若沒付錢把訂單取消, 所佔的票也退回去
- 新建商城, 10天沒上架新產品, 通知使用者
- 註冊成功後, 3天都沒登入新系統
- 預定會議後, 提前將消息丟入對列, 10分鐘後通知會議人員
關於延遲對列的使用場景, 用於發送的數量較多時, 例如促銷活動期間短期內湧入大量訂單, 並且要進行通知, 這種狀況很適合使用, 若每日每小時需要通知對象很少, 使用job進行檢查並發送是一個較簡單的方式, 程式碼範例-RabbitMqClient(pika), 下圖是一個範例, QA QB 是預設延遲時間的對列, 也可以改成QC延遲時間由生產者設定, 就不用產生太多的延遲對列
def declare_delay_queue(self, queue, durable=True, DLX='RetryExchange', TTL=""):
"""
创建延迟队列
:param TTL: ttl的单位是us,ttl=60000 表示 60s
:param queue:
:param DLX:死信转发的exchange
:return:
"""
arguments = {}
if DLX:
# 设置死信转发的exchange,延迟结束后指向的交换机(死信收容交换机)
arguments['x-dead-letter-exchange'] = DLX
if TTL:
# 消息的存活时间,消息过期后会被指向(死信收容交换机)收入死信队列
arguments['x-message-ttl'] = TTL
print(arguments)
self.channel.queue_declare(queue=queue, # 声明队列
durable=durable, # 持久化
優先對列
先建立一個優先對列, 發消息時設定重要程度, 重要程度高的會優先被消費, 一般優先等級設置0~10, 不用太大, 會影響排序效能
官方範例程式
裡面包含更多參數設定, 會在下一篇提到
Ref:
沒有留言:
張貼留言