RabbitMQ 是一个广泛使用的开源消息队列系统,它基于 AMQP(高级消息队列协议)。RabbitMQ 用于在分布式系统中传递消息,确保消息可靠传递并提供弹性。libev
是一个事件驱动的库,用于高效地处理异步事件,常用于网络编程或需要高并发处理的应用。将 RabbitMQ 与 libev
结合使用,可以创建高效、异步的消息处理系统
其提供了一个可靠的消息传递机制,使用“消息队列”模式。该模式中,消费生产者将消息放入队列中,然后消费者从队列中读取并处理消息。
libev
libev是一个高性能的事件循环哭,主要用于C++中处理异步IO操作,其底层基于操作系统的事件通知机制来处理网络请求或者其他事件,支持高并发处理
libev
的设计注重性能和低开销两者结合
RabbitMQ支持AMQP协议,允许进行异步消息的收发,与libev结合可以通过libev来管理RabbitMQ消息队列的事件(例如连接、读取或者写入消息)
Channel通过连接建立,用于执行所有的队列操作。每个Channel对象代表了与RabbitMQ服务器的一个虚拟连接,其可以独立进行消息的发送、接收、确认等操作。
常用接口
其是一个高性能事件循环库,通常用于处理 I/O 多路复用,支持定时器、信号和 I/O 事件,项目中主要用于构建网络应用程序,处理大量并发连接
常用接口
ev_loop
中,程序的执行依赖于它不断地检查和处理这些事件基本逻辑
loop
,并使用 AMQP::LibEvHandler
将事件循环与 AMQP 客户端关联
AMQP::TcpConnection
建立与 RabbitMQ 服务器的 TCP 连接
test-exchange
)和队列(test-queue
),并绑定它们
MessageCb
来处理消息
ev_run
启动 libev 事件循环,等待事件并调用回调函数处理异步消息代码实现
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
//1. 实例化底层网络通信框架的I/O事件监控句柄
auto *loop = EV_DEFAULT;
//2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
AMQP::LibEvHandler handler(loop);
//2.5. 实例化连接对象
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
//3. 实例化信道对象
AMQP::TcpChannel channel(&connection);
//4. 声明交换机
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
//5. 声明队列
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 队列创建成功!" << std::endl;
});
//6. 针对交换机和队列进行绑定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
//7. 向交换机发布消息
for (int i = 0; i < 10; i++) {
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = channel.publish("test-exchange", "test-queue-key", msg);
if (ret == false) {
std::cout << "publish 失败!\n";
}
}
//启动底层网络通信框架--开启I/O
ev_run(loop, 0);
return 0;
}
逻辑梳理
代码实现
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
// 1. 创建底层事件循环,用于监控 I/O 事件
auto *loop = EV_DEFAULT;
// 2. 将 AMQP 框架与 libev 事件循环连接起来
AMQP::LibEvHandler handler(loop);
// 3. 定义 RabbitMQ 服务器地址,包含用户名、密码、IP 和端口
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
// 4. 创建与 RabbitMQ 服务器的 TCP 连接
AMQP::TcpConnection connection(&handler, address);
// 5. 创建 AMQP 信道,负责发送和接收消息
AMQP::TcpChannel channel(&connection);
// 6. 声明交换机(Exchange),指定为 direct 类型,定义消息路由规则
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) { // 交换机声明失败的回调
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){ // 交换机声明成功的回调
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
// 7. 声明队列(Queue),消息将通过此队列传递
channel.declareQueue("test-queue")
.onError([](const char *message) { // 队列声明失败的回调
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){ // 队列声明成功的回调
std::cout << "test-queue 队列创建成功!" << std::endl;
});
// 8. 将交换机与队列绑定,指定路由键(Routing Key)
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) { // 绑定失败的回调
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){ // 绑定成功的回调
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
// 9. 向交换机发布 10 条消息,消息会路由到绑定的队列
for (int i = 0; i < 10; i++) {
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = channel.publish("test-exchange", "test-queue-key", msg); // 发布消息
if (!ret) {
std::cout << "publish 失败!\n"; // 发布消息失败的回调
}
}
// 10. 启动 libev 事件循环,处理所有 I/O 事件
ev_run(loop, 0);
return 0;
}
测试
项目需求
交换机与队列直接交换,实现一台主机将消息发布给另一个主机进行处理
封装接口
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "logger.hpp"
namespace mag {
class MQClient {
public:
// 定义消息回调函数类型:接收消息体和消息体大小
using MessageCallback = std::function<void(const char*, size_t)>;
// 定义智能指针类型,用于管理 MQClient 对象
using ptr = std::shared_ptr<MQClient>;
// 构造函数:初始化 AMQP 连接和事件循环
MQClient(const std::string &user,
const std::string &passwd,
const std::string &host) {
_loop = EV_DEFAULT; // 使用默认事件循环
_handler = std::make_unique<AMQP::LibEvHandler>(_loop); // 初始化 LibEv 处理器
// 构建 AMQP 连接地址,格式为 amqp://user:password@host:port/
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address); // 创建连接
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get()); // 创建信道
// 启动事件循环的线程
_loop_thread = std::thread([this]() {
ev_run(_loop, 0); // 启动事件循环
});
}
// 析构函数:关闭事件循环并释放资源
~MQClient() {
ev_async_init(&_async_watcher, watcher_callback); // 初始化异步事件
ev_async_start(_loop, &_async_watcher); // 启动异步事件
ev_async_send(_loop, &_async_watcher); // 触发异步事件,停止事件循环
_loop_thread.join(); // 等待事件循环线程结束
_loop = nullptr; // 清理事件循环指针
}
// 声明 AMQP 组件:交换机、队列、绑定等
void declareComponents(const std::string &exchange,
const std::string &queue,
const std::string &routing_key = "routing_key",
AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct) {
// 声明交换机
_channel->declareExchange(exchange, exchange_type)
.onError([](const char *message) {
LOG_ERROR("声明交换机失败:{}", message);
exit(0); // 出错时退出
})
.onSuccess([exchange]() {
LOG_ERROR("{} 交换机创建成功!", exchange);
});
// 声明队列
_channel->declareQueue(queue)
.onError([](const char *message) {
LOG_ERROR("声明队列失败:{}", message);
exit(0); // 出错时退出
})
.onSuccess([queue]() {
LOG_ERROR("{} 队列创建成功!", queue);
});
// 绑定交换机和队列
_channel->bindQueue(exchange, queue, routing_key)
.onError([exchange, queue](const char *message) {
LOG_ERROR("{} - {} 绑定失败:{}", exchange, queue, message);
exit(0); // 出错时退出
})
.onSuccess([exchange, queue, routing_key]() {
LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
});
}
// 发布消息到指定交换机
bool publish(const std::string &exchange,
const std::string &msg,
const std::string &routing_key = "routing_key") {
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg); // 发布消息
if (ret == false) {
LOG_ERROR("{} 发布消息失败:", exchange);
return false; // 如果发布失败,返回 false
}
return true; // 发布成功,返回 true
}
// 消费队列中的消息,传入回调函数来处理接收到的消息
void consume(const std::string &queue, const MessageCallback &cb) {
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag") // 返回值 DeferredConsumer
.onReceived([this, cb](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered) {
cb(message.body(), message.bodySize()); // 调用回调处理消息
_channel->ack(deliveryTag); // 确认消息已处理
})
.onError([queue](const char *message) {
LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
exit(0); // 出错时退出
});
}
private:
// 异步事件回调函数,用于退出事件循环
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL); // 退出事件循环
}
private:
struct ev_async _async_watcher; // 异步事件句柄
struct ev_loop *_loop; // 事件循环指针
std::unique_ptr<AMQP::LibEvHandler> _handler; // LibEv 事件处理器
std::unique_ptr<AMQP::TcpConnection> _connection; // AMQP 连接
std::unique_ptr<AMQP::TcpChannel> _channel; // AMQP 信道
std::thread _loop_thread; // 事件循环线程
};
}
测试
#include "rabbitmp.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
void callback(const char *body , size_t sz)
{
std::string msg;
msg.assign(body,sz);
std::cout<<msg<<std::endl;
}
int main(int argc , char *argv[])
{
google::ParseCommandLineFlags(&argc,&argv,true);
mag::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
mag::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
client.declareComponents("test-exchange","test-queue");
client.consume("test-queue",callback);
std::this_thread::sleep_for(std::chrono::seconds(60));
return 0;
}
#include "rabbitmp.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
int main(int argc , char *argv[])
{
google::ParseCommandLineFlags(&argc,&argv,true);
mag::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
mag::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
client.declareComponents("test-exchange" , "test-queue");
for(int i =0;i<10;i++)
{
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = client.publish("test-exchange",msg);
if(ret == false){
std::cout<<"pubblist faile!\n";
}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
}