2021年7月27日 星期二

RabbitMQ(二)

 

RabbitMQ(二)

RabbitMQ是流行的消息隊列服務軟件, 此篇內容包含Exchange交換機, 死信對列, 延遲對列

Exchange交換機

  • 目的 : 生產者發消息, 讓多個consumer收到
  • 一個對列下,  consumer彼此是競爭關係, 只有其中一台consumer收的到消息, 所以必須透過交換機傳送給兩個對列, 各自consumer再去消費, 此模式稱為發佈訂閱模式
  • binding : 交換機與對列之間的綁定, 根據routingKey送到對列, 再送到consumer

Exchange在定義的時候是有型別的,以決定到底哪些queue符號條件,可以接收訊息
  • fanout:所有bind到此exchange的queue都可以接收訊息——廣播
  • direct:通過routingKey和exchange決定那一個組的queue可以接收訊息——組播
  • topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息——根據特徵自定


以下是廣播的範例程式, 其他兩種請參考這裡
# 生產者程式碼
# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 宣告一個exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

# 傳送配置為exchange
channel.basic_publish(exchange="logs",
                      routing_key="",
                      body="exchange message",
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )
# 消費者程式碼

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

# 消費者再次宣告queue
result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除

queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print("waiting for logs......")


def callback(ch, method, properties, body):
    print(body)


channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()

死信對列
有幾種狀況會到死信對列, 再由死信對列進行處理
  • 消息被拒絕, 並且不放回普通對列
  • 消息TTL到期
  • 對列達最大長度
  • 程式碼範例-RabbitMqClient(pika)

延遲對列
與死信對列觀念類似, 在生產者中設定過期時間, 向延遲對列發消息,  時間過後進行處理相關事件, 以下是應用場景
  1. 火車票訂單購買後產生後送入對列, 在10分鐘之內未支付則自動取消, 10分鐘後就會進行延遲對列的處理, 檢查DB中是否付錢, 若沒付錢把訂單取消, 所佔的票也退回去
  2. 新建商城, 10天沒上架新產品, 通知使用者
  3. 註冊成功後, 3天都沒登入新系統
  4. 預定會議後, 提前將消息丟入對列, 10分鐘後通知會議人員
關於延遲對列的使用場景, 用於發送的數量較多時, 例如促銷活動期間短期內湧入大量訂單, 並且要進行通知, 這種狀況很適合使用, 若每日每小時需要通知對象很少, 使用job進行檢查並發送是一個較簡單的方式, 程式碼範例-RabbitMqClient(pika), 下圖是一個範例,  QA QB 是預設延遲時間的對列, 也可以改成QC延遲時間由生產者設定, 就不用產生太多的延遲對列


def declare_delay_queue(self, queue, durable=True, DLX='RetryExchange', TTL=""): """ 创建延迟队列 :param TTL: ttl的单位是us,ttl=60000 表示 60s :param queue: :param DLX:死信转发的exchange :return: """ arguments = {} if DLX: # 设置死信转发的exchange,延迟结束后指向的交换机(死信收容交换机) arguments['x-dead-letter-exchange'] = DLX if TTL: # 消息的存活时间,消息过期后会被指向(死信收容交换机)收入死信队列 arguments['x-message-ttl'] = TTL print(arguments) self.channel.queue_declare(queue=queue, # 声明队列 durable=durable, # 持久化

優先對列
先建立一個優先對列, 發消息時設定重要程度, 重要程度高的會優先被消費, 一般優先等級設置0~10, 不用太大, 會影響排序效能




沒有留言:

張貼留言