rabbitmq在官网上有详细的安装步骤,支持mac,windows和linux系统的安装,默认的server运行端口为15672
,默认的登录账户和密码均为guest
首先需要了解一下rabbitmq的原理图,每个连接都有很多回话通道
Provider首先需要和rabbitmq进行连接,连接端口为5672
public class Provider01 {
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
//端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,每个虚拟机相当于一个独立mq
connectionFactory.setVirtualHost("/");
try (Connection connection = connectionFactory.newConnection()) {
//创建会话通道,通信都在通道中完成
try(Channel channel = connection.createChannel()){
//声明队列,1 队列名称 2 是否持久化 3 是否独占连接,即关闭连接队列自动删除,可用于临时队列创建
//4 自动删除,可用于临时队列创建 5 扩展参数,比如存活时间
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//发送消息,指定交换机 1 交换机,不指定使用默认 2 routingKey 交换机根据路由key将消息转发到指定队列,如果使用默认交换机,则设置为队列名称
//3 消息属性,一般不使用 4 消息内容
channel.basicPublish("", QUEUE_NAME, null, "hello world".getBytes());
System.out.println("send..to..mq");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
同样,在Consumer端使用原生API,也需要创建通道接受消息,这边需要注意的是,
由于消费者一直需要保存监听,所以不需要关闭连接
public class Consumer01 {
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
//端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,每个虚拟机相当于一个独立mq
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接受到消息后调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
//通道中消息id
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("received:"+message);
}
};
//监听队列 1 队列名称 2 自动回复,true表示自己回复mq表示已经接受 3 callback 消费方法,接受后执行的方法
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
发送端操作流程:
接收端操作流程:
根据官网介绍,RabbitMQ有以下几种工作模式 :
一个消息队列可以被多个消费者监听,但是消息不能重复消费,消费者采用轮询机制依次消费队列里的消息。
代码和上面的一样,只不过可以同时开启多个消费者。
和工作队列模式不同,生产者将消息发送给交换机,而交换机与多个消息队列绑定,每个消费者只监听自己的队列。一个消息可以被多个消费者接收到。
public class Provider02 {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String QUEUE_INFORM_SMS = "inform_sms";
private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
try (Connection connection = connectionFactory.newConnection()) {
try (Channel channel = connection.createChannel()) {
//需要声明两个队列
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明一个交换机 1 交换机名称 2 交换机的类型
// 2.1 fanout 对应发布订阅模式
// 2.2 direct 对应的routing模式
// 2.3 topic 对应的通配符模式
// 2.4 headers 对应的headers的工作模式
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//交换机和队列的绑定,第三个参数 routing-key 在对应发布订阅模式设置为""
channel.exchangeBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.exchangeBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,"send inform to users".getBytes());
System.out.println("send..to..mq");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消费者这里是SMS和Email两个服务
public class Consumer_email {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String QUEUE_INFORM_SMS = "inform_sms";
private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接受到消息后调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
//通道中消息id
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("received:"+message);
}
};
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
这两个消费者都能收到消息队列的消息。
上面的例子每个队列只被一个消费者监听。当然,每个队列可以被多个消费者监听,采用的还是轮询方式。
public class Provider03 {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String QUEUE_INFORM_SMS = "inform_sms";
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
private static final String ROUTINGKEY_EMAIL = "inform_email";
private static final String ROUTINGKEY_SMS = "inform_sms";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
try (Connection connection = connectionFactory.newConnection()) {
try (Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
//channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"另外的routing-key");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
//这里需要指定发送的routing-key
channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,"send inform to users".getBytes());
System.out.println("send..to..mq");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
注意通过多次调用channel.queueBind
可以让队列绑定多个routingKey。
然后是消费者,也是几乎一样的代码
public class Consumer_email {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
private static final String ROUTINGKEY_EMAIL = "inform_email";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接受到消息后调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
//通道中消息id
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("received:"+message);
}
};
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
比Direct更加灵活,可以设置通配符来决定消息的分配
public class Provider03 {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String QUEUE_INFORM_SMS = "inform_sms";
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
//inform.email
private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; //inform.email is ok
private static final String ROUTINGKEY_SMS = "inform.#.sms.#";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
try (Connection connection = connectionFactory.newConnection()) {
try (Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
//只发送给email的用户
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,"send inform to users email or sms".getBytes());
//只发送给sms的用户
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,"send inform to users email or sms".getBytes());
//email和sms的用户
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email.sms",null,"send inform to users email or sms".getBytes());
System.out.println("send..to..mq");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
Consumer端
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接受到消息后调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
//通道中消息id
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("received:"+message);
}
};
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
rabbitmq代码写起来还是比较繁琐的,这里使用springboot整合方便使用,在生产者工程和消费者工程配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置文件参考如下
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
新建config包,新建RabbitmqConfig配置类,配置交换机和队列
@Configuration
public class RabbitmqConfig {
private static final String QUEUE_INFORM_EMAIL = "inform_email";
private static final String QUEUE_INFORM_SMS = "inform_sms";
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
private static final String ROUTINGKEY_SMS = "inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange getExchange() {
//durable true 表示持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue getEmailQueue() {
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean(QUEUE_INFORM_SMS)
public Queue getSMSQueue() {
return new Queue(QUEUE_INFORM_SMS);
}
//队列绑定交换机
@Bean
public Binding emailQueueBind(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
@Bean
public Binding SMSQueueBind(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
使用RabbitTemplate发送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class Provider5 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Exchange exchange;
@Test
public void testSendEmail(){
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.email","send to email");
}
}
接收消息
@Component
public class ReceiveHandler {
//也可以拿到channel,Message
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receiver_email(String message){
System.out.println(message);
}
}