RabbitMQ 是一個功能強大的開源消息隊列中間件,廣泛應用于各種分布式系統(tǒng)中。它支持多種消息模式,每種模式都有其獨特的特點和適用場景。下面將詳細介紹 RabbitMQ 常用的消息模式及其適用場景。
簡單隊列模式
簡單隊列模式是 RabbitMQ 中最基礎的消息模式。在這種模式下,生產者將消息發(fā)送到一個隊列,消費者從該隊列中接收消息。這種模式只有一個生產者、一個隊列和一個消費者。
簡單隊列模式的工作流程如下:生產者創(chuàng)建一個連接到 RabbitMQ 服務器,然后創(chuàng)建一個通道,聲明一個隊列,將消息發(fā)送到該隊列。消費者同樣創(chuàng)建連接和通道,聲明相同的隊列,從隊列中接收消息。
以下是簡單隊列模式的 Python 代碼示例:
# 生產者代碼
import pika
# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明隊列
channel.queue_declare(queue='hello')
# 發(fā)送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 關閉連接
connection.close()
# 消費者代碼
import pika
# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明隊列
channel.queue_declare(queue='hello')
# 定義回調函數
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消費消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()適用場景:簡單隊列模式適用于一些簡單的任務處理場景,例如日志收集、簡單的任務分發(fā)等。在這些場景中,不需要復雜的消息路由和處理,只需要將消息從生產者傳遞到消費者即可。
工作隊列模式
工作隊列模式也稱為任務隊列模式,它允許多個消費者從同一個隊列中消費消息。這種模式可以實現任務的負載均衡,提高系統(tǒng)的處理能力。
工作隊列模式的工作流程如下:生產者將消息發(fā)送到一個隊列,多個消費者從該隊列中競爭消費消息。RabbitMQ 默認采用輪詢的方式將消息分發(fā)給消費者。
以下是工作隊列模式的 Python 代碼示例:
# 生產者代碼
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
print(" [x] Sent %r" % message)
connection.close()
# 消費者代碼
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 模擬耗時任務
import time
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()適用場景:工作隊列模式適用于需要處理大量任務的場景,例如批量數據處理、文件處理等。通過多個消費者并行處理任務,可以提高系統(tǒng)的處理效率。
發(fā)布/訂閱模式
發(fā)布/訂閱模式允許生產者將消息發(fā)布到一個交換機,多個消費者可以從該交換機訂閱消息。這種模式實現了消息的一對多廣播。
發(fā)布/訂閱模式的工作流程如下:生產者創(chuàng)建一個連接和通道,聲明一個交換機,將消息發(fā)送到該交換機。消費者同樣創(chuàng)建連接和通道,聲明一個隊列,將隊列綁定到交換機,從隊列中接收消息。
以下是發(fā)布/訂閱模式的 Python 代碼示例:
# 生產者代碼
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明交換機
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'info: Hello World!'
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
# 消費者代碼
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明交換機
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 聲明一個臨時隊列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 將隊列綁定到交換機
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()適用場景:發(fā)布/訂閱模式適用于需要將消息廣播給多個消費者的場景,例如新聞發(fā)布、系統(tǒng)通知等。在這些場景中,多個模塊可能需要同時接收相同的消息。
路由模式
路由模式允許生產者根據消息的路由鍵將消息發(fā)送到不同的隊列。消費者可以根據自己的需求訂閱不同的隊列。
路由模式的工作流程如下:生產者創(chuàng)建一個連接和通道,聲明一個直連交換機,將消息發(fā)送到該交換機,并指定路由鍵。消費者同樣創(chuàng)建連接和通道,聲明一個隊列,將隊列綁定到交換機,并指定綁定鍵。只有當消息的路由鍵與綁定鍵匹配時,消費者才能接收到消息。
以下是路由模式的 Python 代碼示例:
# 生產者代碼
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明直連交換機
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# 消費者代碼
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明直連交換機
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()適用場景:路由模式適用于需要根據消息的不同屬性進行分類處理的場景,例如日志分級處理、不同業(yè)務類型的消息處理等。
主題模式
主題模式是路由模式的擴展,它允許使用通配符來匹配路由鍵。路由鍵可以使用“*”(匹配一個單詞)和“#”(匹配零個或多個單詞)作為通配符。
主題模式的工作流程與路由模式類似,只是在綁定隊列和交換機時可以使用通配符來指定綁定鍵。
以下是主題模式的 Python 代碼示例:
# 生產者代碼
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明主題交換機
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
# 消費者代碼
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 聲明主題交換機
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()適用場景:主題模式適用于需要根據消息的主題進行靈活匹配和處理的場景,例如新聞分類訂閱、商品分類推送等。
綜上所述,RabbitMQ 的不同消息模式各有特點,在實際應用中,需要根據具體的業(yè)務需求選擇合適的消息模式,以提高系統(tǒng)的性能和可維護性。