【RabbitMQ】认识RabbitMQ
前言
在工作中遇到一个需要基于 NodeJs 做一个业务工具的实现。在这个过程中遇到了需要运用 RabbitMQ 的地方,当时挺懵的,前端的技术文档、博客貌似都没有提过这个呀,在这个契机下我开始认识“RabbitMQ”这个新朋友哈哈。
什么是 RabbitMQ?
翻阅了很多资料,发现这个偏后端一点,在我理解下,RabbitMQ 相当于一个“中介”或者是一个“快递站”,在这里面,你可以寄快递也可以收快递,对应的角色也就是商家和消费者,消费者“下单”订阅了某样物品,商家可能立马就能“发货”,也有可能因为缺货导致隔了好几天货到了再进行发货。如果有一个以上批量货物的话,也只能一个接一个的发货,并且根据到货时间遵循着”先到货的先发货“,消费者在接到货物后可能会有反馈,比如五星好评、退货退款等,这时候消费者也是通过 RabbitMQ 来跟商家进行沟通、发送对应的信息。
Rabbit 里面包含了交换机、路由键等,交换机通过指定对应的路由键将”货物“或”反馈消息“分类并添加到对应的队列中,消费者也是会在订阅的时候进行路由键等的绑定,获取在对应队列中的”货物“或”反馈消息“。
RabbitMQ 的实践
首先可以利用 ES6 的语法糖做成一个 class 类,里面有对应的各种方法,例如:初始化、绑定链接监听、重连、发送、断开链接,断开链接是很重要的,因为一直连着会占用资源。
示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189const amqp = require("amqplib/callback_api");
const conf = require("./_config");
const Redis = require("./_redis");
const rabbitMqUrl = conf.get("rabbitmq.url");
const Queue = require("../util/queue");
const { sleep } = require("../util");
let _connection = "";
let reconnectingCount = 0;
class MessageQueue {
constructor() {
// 初始化连接状态
this._connection = null;
this.reconnectingCount = 0;
}
/**
* 初始化与RabbitMQ的连接
* @returns {Promise<void>} 返回连接初始化的Promise
*/
init() {
return new Promise((resolve, reject) => {
amqp.connect(rabbitMqUrl, (err, connection) => {
if (err) {
// 如果连接失败,执行重连逻辑
this.reconnecting(err, "error");
reject(err);
return;
}
// 连接成功,保存连接对象
this._connection = connection;
resolve();
});
});
}
/**
* 处理RabbitMQ连接丢失的重连逻辑
* @param {Object} err 错误对象
* @param {string} event 事件类型(如error、close等)
*/
reconnecting(err, event) {
this.reconnectingCount++;
console.error(
`Lost connection to RabbitMQ. reconnectingCount: ${this.reconnectingCount}. Reconnecting in 10 seconds...`
);
console.error("RabbitMQ connection closed:", event, err);
// 每10秒尝试重连一次
setTimeout(() => {
this.init().catch((err) => console.error("Reconnection failed:", err));
}, 10000);
}
/**
* 发送消息到指定的队列
* @param {string} exchange 交换机名称
* @param {string} queue 队列名称
* @param {Object} msg 发送的消息内容
*/
async send(exchange, queue, msg) {
// 先确保RabbitMQ连接已建立
if (!this._connection) {
console.error("No RabbitMQ connection available!");
return;
}
this._connection.createChannel((err1, channel) => {
if (err1) {
console.error("Failed to create channel:", err1);
throw err1;
}
console.log("========> MQ Send:", msg);
// 确保队列已创建且持久化
channel.assertQueue(queue, { durable: true });
// 将消息对象转为JSON格式并发送
const mqJson = JSON.stringify(msg);
channel.sendToQueue(queue, Buffer.from(mqJson));
// 发送成功后处理后续的Redis操作
this.handleRedisPostSend(msg).catch((error) => {
console.error("Error while handling post-send logic:", error);
});
});
}
/**
* 处理发送消息后的一些后续操作,如Redis检查
* @param {Object} msg 发送的消息
*/
async handleRedisPostSend(msg) {
try {
const redis = new Redis();
await redis.init();
const json = await redis.get("key");
const params = JSON.parse(json || "{}");
// 如果是最后一页,执行关闭操作
if (params.isLast) {
this.close(msg.pageScreenshot);
}
} catch (error) {
console.error("Error accessing Redis:", error);
throw error; // 继续抛出异常,通知调用者
}
}
/**
* 从队列中接收消息并处理
* @param {string} exchange 交换机名称
* @param {string} queue 队列名称
* @param {string} routingKey 路由键
* @param {function} cb 消息处理回调函数
*/
async receive(exchange, queue, routingKey, cb) {
// 如果连接未初始化,先进行初始化
if (!this._connection) {
try {
await this.init();
} catch (error) {
console.error("Failed to initialize RabbitMQ connection:", error);
return;
}
}
// 创建队列并绑定交换机和路由键
this._connection.createChannel((err1, channel) => {
if (err1) {
console.error("Failed to create channel:", err1);
throw err1;
}
// 确保交换机和队列存在
channel.assertExchange(exchange, "topic", { durable: true });
channel.assertQueue(queue, { durable: true });
// 绑定交换机、队列和路由键
channel.bindQueue(queue, exchange, routingKey);
// 设置预取,控制消息的消费速率
channel.prefetch(1, false);
console.log("==========> Waiting for messages in queue:", queue);
// 消费消息
channel.consume(
queue,
async (payload) => {
console.log("=====> Processing message");
try {
// 调用回调处理消息
await cb(payload);
// 确认消息处理完成
channel.ack(payload);
} catch (e) {
// 如果处理失败,拒绝该消息并不重新入队
await channel.reject(payload, false);
console.error("Error while processing message:", e);
}
},
{ noAck: false } // 禁用自动确认
);
});
}
/**
* 关闭连接并退出进程
*/
async close() {
console.log("Closing RabbitMQ connection after 30 seconds...");
await sleep(1000 * 30);
// 检查队列是否为空,如果为空则退出
const isEmpty = Queue.isEmpty();
if (isEmpty) {
this._connection.close();
console.log("RabbitMQ connection closed, exiting process...");
process.exit(0);
} else {
console.log("Queue is not empty, waiting for more tasks...");
}
}
}
module.exports = MessageQueue;在此业务中,我在消息队列收到客户端传过来的参数后进行一系列业务处理,再将处理好格式化的数据通过消息队列传递给后端进行保存,对于一直耕耘前端的我来说,这个知识点确实挺陌生,但是现在我也算跟“rabbitMQ”交上朋友了。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 不吃草的羊!
评论