RabbitQM 处理和管理消息队列的中间人(broker)。可简单理解为邮局,你在程序中写好消息,指定好收件人,剩下的事件就是 RabbitMQ 的工作了,它会保证收件人正确收到邮件。 任何发送邮件的程序都是 而 消息生产者,消费者及 RabbitMQ 这个中间人三者不必同时存在于同一机器上,实际运用时也确实大部分不会部署在同一机器上,比如有专门的机器作为 RabbitMQ 实体,而应用程序会部署在其他的集群。应用程序可以是同时负责生产消息的,也同时是消费者。 来自官方文档中关于 RabbitMQ 消息列队的示意图 安装通过官网提供的地址下载相应平台的程序进行安装,Mac 可通过 Homebrew 进行安装: $ brew update && brew install rabbitmq
启动如果使用 Homebrew 安装,可通过 $ brew services start rabbitmq ==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq) 或直接运行 启动后,会有一个可视化的管理后台,可通过 http://localhost:15672/ 访问,用户名密码皆为 基于 Node.js 的 Hello World 示例通过 amqp.node 展示 RabbitMQ 在 Node.js 中应用的一个示例。 RabbmitMQ 支持多种协议进行通信,amqp.node 使用的是 AMQP 0-9-1 这一开源协议,后者专门为处理消息而设计。作为客户端消费消息,使用的是 amqp.node client 模块,但 RabbitMQ 本身是支持多种客户端的。 初始化一个 Node,js 项目然后通过以下命令安装 amqp.node 模块: $ mkdir rabbitmq-demo && yarn init -y
$ yarn add amqplib
发送消息创建 首先建立到 RabbitMQ 服务的连接, #!/usr/bin/env node var amqp = require(\'amqplib/callback_api\'); amqp.connect(\'amqp://localhost\', function(error0, connection) {}); 连接建立成功后,创建一个通道(channel),具体的发送将会在这个通道中进行。 amqp.connect(\'amqp://localhost\', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) {}); }); 发送消息前,需要先声明一个队列,然后将消息发送到该队列: amqp.connect(\'amqp://localhost\', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = \'hello\'; var msg = \'Hello world\'; channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, Buffer.from(msg)); console.log(\" [x] Sent %s\", msg); }); }); 队列的创建是一个幂等操作,只该队列不存在的情况才会新建。 最后关闭连接并退出。 setTimeout(function() { connection.close(); process.exit(0); }, 500); 完整的 send.js #!/usr/bin/env node var amqp = require(\'amqplib/callback_api\'); amqp.connect(\'amqp://localhost\', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = \'hello\'; var msg = \'Hello World!\'; channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, Buffer.from(msg)); console.log(\" [x] Sent %s\", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); }); 接收消息下面开始编写消费者,消费者做的事情是监听来自 RabbitMQ 的消息并处理。 创建 #!/usr/bin/env node var amqp = require(\'amqplib/callback_api\'); amqp.connect(\'amqp://localhost\', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = \'hello\'; channel.assertQueue(queue, { durable: false }); }); }); 这里的队列声明不会与发送者那边的冲突,因为上面提到过,队列只在不存在的情况下才会重新生成。这里再次声明可以保证监听前队列已经存在。并且实际场景下,消费者有可能是在发送者之前启动的。 然后添加监听的逻辑: console.log(\" [*] Waiting for messages in %s. To exit press CTRL+C\", queue); channel.consume(queue, function(msg) { console.log(\" [x] Received %s\", msg.content.toString()); }, { noAck: true }); 完整的 receive.js #!/usr/bin/env node var amqp = require(\'amqplib/callback_api\'); amqp.connect(\'amqp://localhost\', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = \'hello\'; channel.assertQueue(queue, { durable: false }); console.log(\" [*] Waiting for messages in %s. To exit press CTRL+C\", queue); channel.consume(queue, function(msg) { console.log(\" [x] Received %s\", msg.content.toString()); }, { noAck: true }); }); }); 运行分别在命令行启动上面两个程序,查看打印的信息。 $ node send.js [x] Sent Hello World! $ node receive.js [*] Waiting for messages in hello. To exit press CTRL+C [x] Received Hello World! 另外,可通过 $ /usr/local/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
如果发现 export PATH=/usr/local/sbin:$PATH 其中 fish shell 通过添加如下命令到 fish 的配置文件即可: set -gx PATH /usr/local/sbin $PATH 相关资源 |
RabbitMQ Node.js 示例
释放双眼,带上耳机,听听看~!