您的当前位置:首页正文

RabblitMQ 消息队列组件与 libev事件驱动库

2024-12-01 来源:个人技术集锦

概述

RabbitMQ 是一个广泛使用的开源消息队列系统,它基于 AMQP(高级消息队列协议)。RabbitMQ 用于在分布式系统中传递消息,确保消息可靠传递并提供弹性。libev 是一个事件驱动的库,用于高效地处理异步事件,常用于网络编程或需要高并发处理的应用。将 RabbitMQ 与 libev 结合使用,可以创建高效、异步的消息处理系统

其提供了一个可靠的消息传递机制,使用“消息队列”模式。该模式中,消费生产者将消息放入队列中,然后消费者从队列中读取并处理消息。

  • 可靠性:RabbitMQ 支持消息确认、持久化等机制,确保消息不丢失
  • 异步性:消息可以异步发送与接收,提供高吞吐量和低延迟
  • 路由功能:支持多种路由机制,包括直接交换(direct)、扇出交换(fanout)和主题交换(topic)
  • 高可用性:通过集群和镜像队列保证消息的高可用性

libev

libev是一个高性能的事件循环哭,主要用于C++中处理异步IO操作,其底层基于操作系统的事件通知机制来处理网络请求或者其他事件,支持高并发处理

  • 高效的事件循环:适用于事件驱动的网络应用程序
  • 低内存占用libev 的设计注重性能和低开销
  • 灵活的事件模型:支持定时器、信号处理、文件描述符事件等多种事件类型

两者结合

RabbitMQ支持AMQP协议,允许进行异步消息的收发,与libev结合可以通过libev来管理RabbitMQ消息队列的事件(例如连接、读取或者写入消息)

常用类和接口

Channel类

Channel通过连接建立,用于执行所有的队列操作。每个Channel对象代表了与RabbitMQ服务器的一个虚拟连接,其可以独立进行消息的发送、接收、确认等操作。

常用接口

  • AMQP::Channel:连接的工作单位,通过它你可以定义交换机(exchange)、队列(queue)和绑定(binding)等;通过这个类可以实现发送和接收消息
  • AMQP::Connection:表示与 RabbitMQ 服务器的连接。每个连接对应一个与 RabbitMQ 服务器的 TCP 连接
  • AMQP::Queue:表示队列。你可以将消息发布到队列中,或从队列中消费消息
  • AMQP::Exchange:用于消息路由。通过交换机,你可以决定将消息发送到哪个队列
  • AMQP::Message:消息类,表示通过 RabbitMQ 发送的消息。它包括消息体、属性和标头等

libev

其是一个高性能事件循环库,通常用于处理 I/O 多路复用,支持定时器、信号和 I/O 事件,项目中主要用于构建网络应用程序,处理大量并发连接

常用接口

  • ev_loop:事件循环的核心对象。在 libev 中,所有的事件和回调都注册到 ev_loop 中,程序的执行依赖于它不断地检查和处理这些事件
  • ev_io:用于监控 I/O 事件(如可读/可写)。通常与套接字相关,用于处理来自网络的事件
  • ev_timer:用于定时事件,类似于定时器,用于在指定的时间间隔后触发回调
  • ev_signal:用于捕获信号,在接收到信号时,触发回调函数

基本使用

consumer

基本逻辑

  • 初始化事件循环:通过 libev 创建一个事件循环 loop,并使用 AMQP::LibEvHandler 将事件循环与 AMQP 客户端关联
    • 快速理解:loop类似于一个活动的组织者负责每个成员的IO,handler则是助手(任务:将所有信息和任务连接在一起)
  • 建立与 RabbitMQ 的连接:通过 AMQP::TcpConnection 建立与 RabbitMQ 服务器的 TCP 连接
    • 理解:类似于活动的接待者(Connection)为来者提供了直接交流通道,这个通道(Address)可以和别人发送消息
  • 声明交换机和队列:声明交换机(test-exchange)和队列(test-queue),并绑定它们
    • 理解:声明你的谈话方式和交流位置
  • 消费消息:订阅队列中的消息并注册回调函数 MessageCb 来处理消息
    • 理解:当被人给你传递消息后你是如何做出反应
  • 启动事件循环:调用 ev_run 启动 libev 事件循环,等待事件并调用回调函数处理异步消息

代码实现

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>

int main()
{
    //1. 实例化底层网络通信框架的I/O事件监控句柄
    auto *loop = EV_DEFAULT;
    //2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
    AMQP::LibEvHandler handler(loop);
    //2.5. 实例化连接对象
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);
    //3. 实例化信道对象
    AMQP::TcpChannel channel(&connection);
    //4. 声明交换机
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange 交换机创建成功!" << std::endl;
        });
    //5. 声明队列
    channel.declareQueue("test-queue")
        .onError([](const char *message) {
            std::cout << "声明队列失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-queue 队列创建成功!" << std::endl;
        });
    //6. 针对交换机和队列进行绑定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) {
            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
        });
    //7. 向交换机发布消息
    for (int i = 0; i < 10; i++) {
        std::string msg = "Hello Bite-" + std::to_string(i);
        bool ret = channel.publish("test-exchange", "test-queue-key", msg);
        if (ret == false) {
            std::cout << "publish 失败!\n";
        }
    }
    //启动底层网络通信框架--开启I/O
    ev_run(loop, 0);
    return 0;
}

publish

逻辑梳理

  • 连接 RabbitMQ:通过 AMQP 协议连接到 RabbitMQ 服务器,设置通信所需的交换机和队列
  • 声明交换机和队列:声明一个交换机和队列,并将它们通过路由键绑定在一起
  • 发布消息:生产者向交换机发布消息,消息根据绑定的路由规则被转发到队列中
  • 事件循环:启动 libev 事件循环,处理网络 I/O 操作,确保消息的传输和处理

代码实现

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>

int main()
{
    // 1. 创建底层事件循环,用于监控 I/O 事件
    auto *loop = EV_DEFAULT;

    // 2. 将 AMQP 框架与 libev 事件循环连接起来
    AMQP::LibEvHandler handler(loop);

    // 3. 定义 RabbitMQ 服务器地址,包含用户名、密码、IP 和端口
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    
    // 4. 创建与 RabbitMQ 服务器的 TCP 连接
    AMQP::TcpConnection connection(&handler, address);

    // 5. 创建 AMQP 信道,负责发送和接收消息
    AMQP::TcpChannel channel(&connection);

    // 6. 声明交换机(Exchange),指定为 direct 类型,定义消息路由规则
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message) { // 交换机声明失败的回调
            std::cout << "声明交换机失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){ // 交换机声明成功的回调
            std::cout << "test-exchange 交换机创建成功!" << std::endl;
        });

    // 7. 声明队列(Queue),消息将通过此队列传递
    channel.declareQueue("test-queue")
        .onError([](const char *message) { // 队列声明失败的回调
            std::cout << "声明队列失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){ // 队列声明成功的回调
            std::cout << "test-queue 队列创建成功!" << std::endl;
        });

    // 8. 将交换机与队列绑定,指定路由键(Routing Key)
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) { // 绑定失败的回调
            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){ // 绑定成功的回调
            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
        });

    // 9. 向交换机发布 10 条消息,消息会路由到绑定的队列
    for (int i = 0; i < 10; i++) {
        std::string msg = "Hello Bite-" + std::to_string(i);
        bool ret = channel.publish("test-exchange", "test-queue-key", msg); // 发布消息
        if (!ret) {
            std::cout << "publish 失败!\n"; // 发布消息失败的回调
        }
    }

    // 10. 启动 libev 事件循环,处理所有 I/O 事件
    ev_run(loop, 0);

    return 0;
}

测试

项目二次封装

实现思路

项目需求

交换机与队列直接交换,实现一台主机将消息发布给另一个主机进行处理

封装接口

  • 提供声明指定交换机与队列,然后绑定其功能
  • 提供交换机发布消息功能
  • 提供订阅指定队列消息,同时设置回调函数进行消息消费处理

具体实现

#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "logger.hpp"

namespace mag {

    class MQClient {
    public:
        // 定义消息回调函数类型:接收消息体和消息体大小
        using MessageCallback = std::function<void(const char*, size_t)>;
        // 定义智能指针类型,用于管理 MQClient 对象
        using ptr = std::shared_ptr<MQClient>;

        // 构造函数:初始化 AMQP 连接和事件循环
        MQClient(const std::string &user, 
                 const std::string &passwd, 
                 const std::string &host) {
            _loop = EV_DEFAULT;  // 使用默认事件循环
            _handler = std::make_unique<AMQP::LibEvHandler>(_loop);  // 初始化 LibEv 处理器

            // 构建 AMQP 连接地址,格式为 amqp://user:password@host:port/
            std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
            AMQP::Address address(url);
            _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);  // 创建连接
            _channel = std::make_unique<AMQP::TcpChannel>(_connection.get());  // 创建信道

            // 启动事件循环的线程
            _loop_thread = std::thread([this]() {
                ev_run(_loop, 0);  // 启动事件循环
            });
        }

        // 析构函数:关闭事件循环并释放资源
        ~MQClient() {
            ev_async_init(&_async_watcher, watcher_callback);  // 初始化异步事件
            ev_async_start(_loop, &_async_watcher);  // 启动异步事件
            ev_async_send(_loop, &_async_watcher);  // 触发异步事件,停止事件循环
            _loop_thread.join();  // 等待事件循环线程结束
            _loop = nullptr;  // 清理事件循环指针
        }

        // 声明 AMQP 组件:交换机、队列、绑定等
        void declareComponents(const std::string &exchange,
                               const std::string &queue,
                               const std::string &routing_key = "routing_key",
                               AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct) {
            // 声明交换机
            _channel->declareExchange(exchange, exchange_type)
                .onError([](const char *message) {
                    LOG_ERROR("声明交换机失败:{}", message);
                    exit(0);  // 出错时退出
                })
                .onSuccess([exchange]() {
                    LOG_ERROR("{} 交换机创建成功!", exchange);
                });

            // 声明队列
            _channel->declareQueue(queue)
                .onError([](const char *message) {
                    LOG_ERROR("声明队列失败:{}", message);
                    exit(0);  // 出错时退出
                })
                .onSuccess([queue]() {
                    LOG_ERROR("{} 队列创建成功!", queue);
                });

            // 绑定交换机和队列
            _channel->bindQueue(exchange, queue, routing_key)
                .onError([exchange, queue](const char *message) {
                    LOG_ERROR("{} - {} 绑定失败:{}", exchange, queue, message);
                    exit(0);  // 出错时退出
                })
                .onSuccess([exchange, queue, routing_key]() {
                    LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
                });
        }

        // 发布消息到指定交换机
        bool publish(const std::string &exchange, 
                     const std::string &msg, 
                     const std::string &routing_key = "routing_key") {
            LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
            bool ret = _channel->publish(exchange, routing_key, msg);  // 发布消息
            if (ret == false) {
                LOG_ERROR("{} 发布消息失败:", exchange);
                return false;  // 如果发布失败,返回 false
            }
            return true;  // 发布成功,返回 true
        }

        // 消费队列中的消息,传入回调函数来处理接收到的消息
        void consume(const std::string &queue, const MessageCallback &cb) {
            LOG_DEBUG("开始订阅 {} 队列消息!", queue);
            _channel->consume(queue, "consume-tag")  // 返回值 DeferredConsumer
                .onReceived([this, cb](const AMQP::Message &message, 
                                       uint64_t deliveryTag, 
                                       bool redelivered) {
                    cb(message.body(), message.bodySize());  // 调用回调处理消息
                    _channel->ack(deliveryTag);  // 确认消息已处理
                })
                .onError([queue](const char *message) {
                    LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
                    exit(0);  // 出错时退出
                });
        }

    private:
        // 异步事件回调函数,用于退出事件循环
        static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
            ev_break(loop, EVBREAK_ALL);  // 退出事件循环
        }

    private:
        struct ev_async _async_watcher;  // 异步事件句柄
        struct ev_loop *_loop;  // 事件循环指针
        std::unique_ptr<AMQP::LibEvHandler> _handler;  // LibEv 事件处理器
        std::unique_ptr<AMQP::TcpConnection> _connection;  // AMQP 连接
        std::unique_ptr<AMQP::TcpChannel> _channel;  // AMQP 信道
        std::thread _loop_thread;  // 事件循环线程
    };

}

封装测试

测试

#include "rabbitmp.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>

DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");


DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

void callback(const char *body , size_t sz)
{
    std::string msg;
    msg.assign(body,sz);
    std::cout<<msg<<std::endl;
}

int main(int argc , char *argv[])
{
    google::ParseCommandLineFlags(&argc,&argv,true);
    mag::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    mag::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);

    client.declareComponents("test-exchange","test-queue");
    client.consume("test-queue",callback);

    std::this_thread::sleep_for(std::chrono::seconds(60));
    return 0;
}
#include "rabbitmp.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>

DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");


DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

int main(int argc , char *argv[])
{
    google::ParseCommandLineFlags(&argc,&argv,true);
    mag::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    mag::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);

    client.declareComponents("test-exchange" , "test-queue");

    for(int i =0;i<10;i++)
    {
        std::string msg = "Hello Bite-" + std::to_string(i);
        bool ret = client.publish("test-exchange",msg);
        if(ret == false){
            std::cout<<"pubblist faile!\n";
        }
    }

    std::this_thread::sleep_for(std::chrono::seconds(3));
}

显示全文