RocketMQ 是一款由阿里巴巴集团开发并开源给Apache软件基金会的分布式消息及流处理平台。以其高吞吐量、低延迟、高可用性等特点而广受欢迎。支持Java,C++, Python, Go, .NET等。
异步解耦:可以实现上游和下游业务系统的松耦合设计,使得服务部分节点异常不会影响到核心交易系统的正常运转。在电商、金融等分布式系统中,这种解耦设计尤为重要。
削峰填谷:在如秒杀、大促等大型活动中,系统会面临巨大的流量冲击。RocketMQ利用其高性能的消息处理能力,可以有效地应对这种流量冲击,保证系统的稳定运行。
顺序消息:支持顺序消息(分区有序),可以确保消息的先进先出。这在交易系统中的订单创建、支付、退款等流程中尤为重要,因为这些流程对消息的顺序有严格要求。
分布式事务消息:支持分布式事务消息,可以保证分布式事务的强一致性。这在涉及多个服务的分布式系统中非常有用,可以确保数据的一致性和完整性。
RocketMQ优点
高吞吐量和低延迟:能够处理大规模消息流,并提供低延迟的消息传递。这使得它非常适合处理高并发的应用场景,如电子商务和金融交易系统。
可靠性:具有高度可靠的消息传递机制。它支持消息持久化和复制,确保消息不会丢失,并能够在故障发生时进行自动恢复。
分布式扩展:支持水平扩展,可以方便地添加新的消息生产者和消费者来应对负载增加的情况。
易于部署:提供开箱即用的部署方式,非常适合在分布式系统中使用。
RocketMQ架构
#debian
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
使用以下命令从 Docker Hub 拉取最新的 RocketMQ 镜像:
docker pull apache/rocketmq:latest
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv
```bash
## 启动 RocketMQ Broker:
RocketMQ 的 Broker 负责存储消息并处理生产者和消费者的请求。使用以下命令启动 Broker:
```bash
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "BROKER_NAME=broker-a" -e "BROKER_ID=0" -e "AUTO_CREATE_TOPIC_ENABLE=true" -e "AUTO_CREATE_SUBSCRIPTION_GROUP=true" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker
验证
查看 NameServer 日志:
docker logs -f rmqnamesrv
查看 Broker 日志:
docker logs -f rmqbroker
如果需要可视化管理 RocketMQ,可以运行 RocketMQ Console:
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng
总结
# 拉取 RocketMQ 镜像
docker pull apache/rocketmq:latest
# 启动 NameServer
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv
# 启动 Broker
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker
# 启动 RocketMQ Console
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng
如果是外网环境
run Broker 外网IP
mkdir -p /usr/data/rocketMQ/data/broker/logs
mkdir -p /usr/data/rocketMQ/data/broker/store
mkdir -p /usr/data/rocketMQ/data/broker/conf/
broket.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 外网ip:9876
brokerIP1 = 外网ip
autoCreateTopicEnable=true
docker run -p 0.0.0.0:10911:10911 -p 0.0.0.0:10909:10909 -d -v /usr/data/rocketMQ/data/broker/logs:/root/logs -v /usr/data/rocketMQ/data/broker/store:/root/store -v /usr/data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "autoCreateTopicEnable=true" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf
//main
XTrace.UseConsole();
var producer = new Producer
{
NameServerAddress = "x.x.x.x:9876",
Topic = "t0529"
};
try
{
producer.Start();
// 发送一条测试消息,以确保 Topic 被创建
for (var i = 0; i < 10; i++)
{
var str = "mqm" + i;
//var str = Rand.NextString(1337);
var sr = producer.Publish(str, "TagA");
}
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.Message}");
}
finally
{
producer.Stop();
}
var consumer = new Consumer
{
Topic = "t0529",
Group = "test",
NameServerAddress = "x.x.x.x:9876",
FromLastOffset = false,
//SkipOverStoredMsgCount = 0,
//BatchSize = 20,
Log = XTrace.Log,
ClientLog = XTrace.Log,
};
consumer.OnConsume = OnConsume;
consumer.Configure(MqSetting.Current);
consumer.Start();
_consumer = consumer;