前言

在工作中遇到一个需要基于 NodeJs 做一个业务工具的实现。在这个过程中遇到了需要运用 RabbitMQ 的地方,当时挺懵的,前端的技术文档、博客貌似都没有提过这个呀,在这个契机下我开始认识“RabbitMQ”这个新朋友哈哈。

什么是 RabbitMQ?

​ 翻阅了很多资料,发现这个偏后端一点,在我理解下,RabbitMQ 相当于一个“中介”或者是一个“快递站”,在这里面,你可以寄快递也可以收快递,对应的角色也就是商家和消费者,消费者“下单”订阅了某样物品,商家可能立马就能“发货”,也有可能因为缺货导致隔了好几天货到了再进行发货。如果有一个以上批量货物的话,也只能一个接一个的发货,并且根据到货时间遵循着”先到货的先发货“,消费者在接到货物后可能会有反馈,比如五星好评、退货退款等,这时候消费者也是通过 RabbitMQ 来跟商家进行沟通、发送对应的信息。

​ Rabbit 里面包含了交换机、路由键等,交换机通过指定对应的路由键将”货物“或”反馈消息“分类并添加到对应的队列中,消费者也是会在订阅的时候进行路由键等的绑定,获取在对应队列中的”货物“或”反馈消息“。

RabbitMQ 的实践

  • 首先可以利用 ES6 的语法糖做成一个 class 类,里面有对应的各种方法,例如:初始化、绑定链接监听、重连、发送、断开链接,断开链接是很重要的,因为一直连着会占用资源。

    示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    const amqp = require("amqplib/callback_api");
    const conf = require("./_config");
    const Redis = require("./_redis");
    const rabbitMqUrl = conf.get("rabbitmq.url");
    const Queue = require("../util/queue");
    const { sleep } = require("../util");

    let _connection = "";
    let reconnectingCount = 0;

    class MessageQueue {
    constructor() {
    // 初始化连接状态
    this._connection = null;
    this.reconnectingCount = 0;
    }

    /**
    * 初始化与RabbitMQ的连接
    * @returns {Promise<void>} 返回连接初始化的Promise
    */
    init() {
    return new Promise((resolve, reject) => {
    amqp.connect(rabbitMqUrl, (err, connection) => {
    if (err) {
    // 如果连接失败,执行重连逻辑
    this.reconnecting(err, "error");
    reject(err);
    return;
    }
    // 连接成功,保存连接对象
    this._connection = connection;
    resolve();
    });
    });
    }

    /**
    * 处理RabbitMQ连接丢失的重连逻辑
    * @param {Object} err 错误对象
    * @param {string} event 事件类型(如error、close等)
    */
    reconnecting(err, event) {
    this.reconnectingCount++;
    console.error(
    `Lost connection to RabbitMQ. reconnectingCount: ${this.reconnectingCount}. Reconnecting in 10 seconds...`
    );
    console.error("RabbitMQ connection closed:", event, err);

    // 每10秒尝试重连一次
    setTimeout(() => {
    this.init().catch((err) => console.error("Reconnection failed:", err));
    }, 10000);
    }

    /**
    * 发送消息到指定的队列
    * @param {string} exchange 交换机名称
    * @param {string} queue 队列名称
    * @param {Object} msg 发送的消息内容
    */
    async send(exchange, queue, msg) {
    // 先确保RabbitMQ连接已建立
    if (!this._connection) {
    console.error("No RabbitMQ connection available!");
    return;
    }

    this._connection.createChannel((err1, channel) => {
    if (err1) {
    console.error("Failed to create channel:", err1);
    throw err1;
    }

    console.log("========> MQ Send:", msg);

    // 确保队列已创建且持久化
    channel.assertQueue(queue, { durable: true });

    // 将消息对象转为JSON格式并发送
    const mqJson = JSON.stringify(msg);
    channel.sendToQueue(queue, Buffer.from(mqJson));

    // 发送成功后处理后续的Redis操作
    this.handleRedisPostSend(msg).catch((error) => {
    console.error("Error while handling post-send logic:", error);
    });
    });
    }

    /**
    * 处理发送消息后的一些后续操作,如Redis检查
    * @param {Object} msg 发送的消息
    */
    async handleRedisPostSend(msg) {
    try {
    const redis = new Redis();
    await redis.init();

    const json = await redis.get("key");
    const params = JSON.parse(json || "{}");

    // 如果是最后一页,执行关闭操作
    if (params.isLast) {
    this.close(msg.pageScreenshot);
    }
    } catch (error) {
    console.error("Error accessing Redis:", error);
    throw error; // 继续抛出异常,通知调用者
    }
    }

    /**
    * 从队列中接收消息并处理
    * @param {string} exchange 交换机名称
    * @param {string} queue 队列名称
    * @param {string} routingKey 路由键
    * @param {function} cb 消息处理回调函数
    */
    async receive(exchange, queue, routingKey, cb) {
    // 如果连接未初始化,先进行初始化
    if (!this._connection) {
    try {
    await this.init();
    } catch (error) {
    console.error("Failed to initialize RabbitMQ connection:", error);
    return;
    }
    }

    // 创建队列并绑定交换机和路由键
    this._connection.createChannel((err1, channel) => {
    if (err1) {
    console.error("Failed to create channel:", err1);
    throw err1;
    }

    // 确保交换机和队列存在
    channel.assertExchange(exchange, "topic", { durable: true });
    channel.assertQueue(queue, { durable: true });

    // 绑定交换机、队列和路由键
    channel.bindQueue(queue, exchange, routingKey);

    // 设置预取,控制消息的消费速率
    channel.prefetch(1, false);
    console.log("==========> Waiting for messages in queue:", queue);

    // 消费消息
    channel.consume(
    queue,
    async (payload) => {
    console.log("=====> Processing message");
    try {
    // 调用回调处理消息
    await cb(payload);
    // 确认消息处理完成
    channel.ack(payload);
    } catch (e) {
    // 如果处理失败,拒绝该消息并不重新入队
    await channel.reject(payload, false);
    console.error("Error while processing message:", e);
    }
    },
    { noAck: false } // 禁用自动确认
    );
    });
    }

    /**
    * 关闭连接并退出进程
    */
    async close() {
    console.log("Closing RabbitMQ connection after 30 seconds...");
    await sleep(1000 * 30);

    // 检查队列是否为空,如果为空则退出
    const isEmpty = Queue.isEmpty();
    if (isEmpty) {
    this._connection.close();
    console.log("RabbitMQ connection closed, exiting process...");
    process.exit(0);
    } else {
    console.log("Queue is not empty, waiting for more tasks...");
    }
    }
    }

    module.exports = MessageQueue;

    在此业务中,我在消息队列收到客户端传过来的参数后进行一系列业务处理,再将处理好格式化的数据通过消息队列传递给后端进行保存,对于一直耕耘前端的我来说,这个知识点确实挺陌生,但是现在我也算跟“rabbitMQ”交上朋友了。