RabbitMQ
2 min read393 words

RabbitMQ

Technology
Technology

想要一个简单一点的MQ, 满足消峰的诉求.

版本:

rabbitmq:4-management-alpine

看了很多, 最终留下两个,它们在中小规模场景性能接近,部署简单、高可用配置方便

ActiveMQ 活跃度低

RabbitMQ 稍微大一点,但是活跃度高 ,同时使用了

alpine
版本,限制了内存大小. 满足业务使用.

docker run -d --name rabbitmq --memory=256m -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -v /myrabbitmq/data:/var/lib/rabbitmq rabbitmq:4-management-alpine

简单demo。实现收发

import pika import time import threading # 连接 RabbitMQ def get_connection(): credentials = pika.PlainCredentials('guest', 'guest') # 如果是远程服务器,需要修改 RabbitMQ 配置 parameters = pika.ConnectionParameters( host='localhost', credentials=credentials, port=5672, heartbeat=600, blocked_connection_timeout=300 ) return pika.BlockingConnection(parameters) # 发布消息 def publish_message(queue_name, message): connection = get_connection() channel = connection.channel() # 声明队列(确保队列存在) channel.queue_declare(queue=queue_name, durable=True) channel.basic_publish( exchange='', routing_key=queue_name, body=message.encode('utf-8'), properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ) ) print(f" [x] Sent '{message}'") connection.close() # 消费消息 def consume_messages(queue_name): connection = get_connection() channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) def callback(ch, method, properties, body): print(f" [x] Received '{body.decode('utf-8')}'") time.sleep(1) # 模拟处理时间 print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认 # 公平分发(prefetch_count=1) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=queue_name, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() if __name__ == '__main__': QUEUE_NAME = 'text_queue' # 启动消费者线程 consumer_thread = threading.Thread(target=consume_messages, args=(QUEUE_NAME,), daemon=True) consumer_thread.start() # 发布几条消息 for i in range(5): publish_message(QUEUE_NAME, f"Hello World! {i}") time.sleep(0.5) # 等待消费者处理完成 time.sleep(10)

使用异步连接实现,集成如fastAPI

aio_pika==9.5.6

class RabbitMQHandler: """ RabbitMQ 连接管理类 """ rabbitmq_url: str = "amqp://XXXX/" # 全局变量保存连接和 channel rabbit_connection: AbstractRobustConnection = None rabbit_channel: AbstractRobustChannel = None # 要监听的多个队列 queues: list[str] = [] # 保存消费者任务,方便关闭 consumer_tasks: list[Task] = [] def __init__(self, app, context): if context.rabbitmq.url: self.rabbitmq_url = context.rabbitmq.url self.queues = context.rabbitmq.queues @app.on_event("startup") async def startup_event(): # 建立连接 rabbit_connection = await aio_pika.connect_robust(self.rabbitmq_url) # 创建 channel rabbit_channel = await rabbit_connection.channel() # 保存到上下文中,方便其他地方使用 context.rabbitmq.rabbit_channel = rabbit_channel logger.info("RabbitMQ connected.") # 为每个队列创建一个消费者任务 for queue_name in self.queues: task = asyncio.create_task(self.consume_queue(rabbit_channel, queue_name, context)) self.consumer_tasks.append(task) @app.on_event("shutdown") async def shutdown_event(): # 取消所有消费者任务 for task in self.consumer_tasks: task.cancel() await asyncio.gather(*self.consumer_tasks, return_exceptions=True) # 关闭连接 if self.rabbit_channel: await self.rabbit_channel.close() if self.rabbit_connection: await self.rabbit_connection.close() logger.info("RabbitMQ connection closed.") async def consume_queue(self, channel: AbstractRobustChannel, queue_name: str, context: AppContext): """消费单个队列的消息""" queue = await channel.declare_queue(queue_name, durable=True) logger.info(f"[*] Waiting for messages in {queue_name}. To exit press CTRL+C") staticManager = context.staticManager async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): # 处理消息 mqtt_msg = message.body.decode("utf-8") logger.info(f"[{queue_name}] Received: {message.body.decode()}")