您的当前位置:首页正文

RocketMQ .NET

2024-11-08 来源:个人技术集锦

RocketMQ 是一款由阿里巴巴集团开发并开源给Apache软件基金会的分布式消息及流处理平台。以其高吞吐量、低延迟、高可用性等特点而广受欢迎。支持Java,C++, Python, Go, .NET等。

异步解耦:可以实现上游和下游业务系统的松耦合设计,使得服务部分节点异常不会影响到核心交易系统的正常运转。在电商、金融等分布式系统中,这种解耦设计尤为重要。
削峰填谷:在如秒杀、大促等大型活动中,系统会面临巨大的流量冲击。RocketMQ利用其高性能的消息处理能力,可以有效地应对这种流量冲击,保证系统的稳定运行。
顺序消息:支持顺序消息(分区有序),可以确保消息的先进先出。这在交易系统中的订单创建、支付、退款等流程中尤为重要,因为这些流程对消息的顺序有严格要求。
分布式事务消息:支持分布式事务消息,可以保证分布式事务的强一致性。这在涉及多个服务的分布式系统中非常有用,可以确保数据的一致性和完整性。

RocketMQ优点
高吞吐量和低延迟:能够处理大规模消息流,并提供低延迟的消息传递。这使得它非常适合处理高并发的应用场景,如电子商务和金融交易系统。
可靠性:具有高度可靠的消息传递机制。它支持消息持久化和复制,确保消息不会丢失,并能够在故障发生时进行自动恢复。
分布式扩展:支持水平扩展,可以方便地添加新的消息生产者和消费者来应对负载增加的情况。
易于部署:提供开箱即用的部署方式,非常适合在分布式系统中使用。

RocketMQ架构

部署RocketMQ(docker)

安装 Docker:

#debian
 curl -fsSL https://get.docker.com -o get-docker.sh
 sudo sh get-docker.sh

拉取 RocketMQ 镜像:

使用以下命令从 Docker Hub 拉取最新的 RocketMQ 镜像:

docker pull apache/rocketmq:latest

启动 RocketMQ NameServer:

docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv
```bash
## 启动 RocketMQ Broker:
RocketMQ 的 Broker 负责存储消息并处理生产者和消费者的请求。使用以下命令启动 Broker:


```bash
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876"   -e "BROKER_NAME=broker-a"    -e "BROKER_ID=0"    -e "AUTO_CREATE_TOPIC_ENABLE=true"    -e "AUTO_CREATE_SUBSCRIPTION_GROUP=true"    -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker

验证
查看 NameServer 日志:

docker logs -f rmqnamesrv

查看 Broker 日志:

docker logs -f rmqbroker

使用 RocketMQ Console:

如果需要可视化管理 RocketMQ,可以运行 RocketMQ Console:

docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng

总结

# 拉取 RocketMQ 镜像
docker pull apache/rocketmq:latest

# 启动 NameServer
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv

# 启动 Broker
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker

# 启动 RocketMQ Console
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng

如果是外网环境
run Broker 外网IP

mkdir -p /usr/data/rocketMQ/data/broker/logs
mkdir -p /usr/data/rocketMQ/data/broker/store
mkdir -p /usr/data/rocketMQ/data/broker/conf/

broket.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 外网ip:9876
brokerIP1 = 外网ip
autoCreateTopicEnable=true
docker run -p 0.0.0.0:10911:10911 -p 0.0.0.0:10909:10909 -d -v /usr/data/rocketMQ/data/broker/logs:/root/logs -v /usr/data/rocketMQ/data/broker/store:/root/store -v /usr/data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "autoCreateTopicEnable=true" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf

.NET RocketMQ

NuGet NewLife.RocketMQ

Producer

	//main
   XTrace.UseConsole();

   var producer = new Producer
   {
       NameServerAddress = "x.x.x.x:9876",
       Topic = "t0529"
   };

   try
   {
       producer.Start();

       // 发送一条测试消息,以确保 Topic 被创建
       for (var i = 0; i < 10; i++)
       {
           var str = "mqm" + i;
           //var str = Rand.NextString(1337);

           var sr = producer.Publish(str, "TagA");
       }
   }
   catch (Exception ex)
   {
       Console.WriteLine($"Exception: {ex.Message}");
   }
   finally
   {
       producer.Stop();
   }

Consumer

 var consumer = new Consumer
 {
     Topic = "t0529",
     Group = "test",
     NameServerAddress = "x.x.x.x:9876",

     FromLastOffset = false,
     //SkipOverStoredMsgCount = 0,
     //BatchSize = 20,

     Log = XTrace.Log,
     ClientLog = XTrace.Log,
 };

 consumer.OnConsume = OnConsume;

 consumer.Configure(MqSetting.Current);
 consumer.Start();

 _consumer = consumer;

显示全文