Blog

  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

springboot-rabbitmq

发表于 2019-08-25 分类于 SpringBoot 阅读次数:
本文字数: 4.6k

简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。

核心概念

  • Producer&Consumer
    • producer指的是消息生产者,consumer消息的消费者。
  • Broker
    • 它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
  • Queue
    • 消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
    • 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
    • 设置为临时队列,queue中的数据在系统重启之后就会丢失
    • 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除
  • Exchange
    • 消息交换机,它指定消息按什么规则,路由到哪个队列。
    • Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
  • Binding
    • 将一个特定的Exchange 和一个特定的Queue 绑定起来。
    • Exchange 和Queue的绑定可以是多对多的关系。
  • virtual host(vhosts )
    • 在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)
    • 每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings–vhost相当于物理的server,可以为不同app提供边界隔离
    • producer和consumer连接rabbit server需要指定一个vhost

环境搭建

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

自动配置原理

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
//...
//CachingConnectionFactory
//RabbitTemplate
//AmqpAdmin
//RabbitMessagingTemplate
}

创建Exchange

1
2
3
4
5
6
7
8
@Autowired
private AmqpAdmin admin;
@Test
public void createExchange(){
admin.declareExchange(new DirectExchange("DirectExchange"));
admin.declareExchange(new FanoutExchange("FanoutExchange"));
admin.declareExchange(new TopicExchange("TopicExchange"));
}

创建队列

1
2
3
4
5
6
7
8
9
@Test
public void creatQueue(){
admin.declareQueue(new Queue("queue1",true));
admin.declareQueue(new Queue("queue2",true));
admin.declareQueue(new Queue("queue3",true));
admin.declareQueue(new Queue("queue4",true));
admin.declareQueue(new Queue("queue5",true));
admin.declareQueue(new Queue("queue6",true));
}

创建依赖

1
2
3
4
5
6
7
8
9
@Test
public void creatBinding(){
admin.declareBinding(new Binding("queue1", Binding.DestinationType.QUEUE, "DirectExchange","directQueue1",null));
admin.declareBinding(new Binding("queue2", Binding.DestinationType.QUEUE, "FanoutExchange","fanoutQueue2",null));
admin.declareBinding(new Binding("queue3", Binding.DestinationType.QUEUE, "FanoutExchange","fanoutQueue3",null));
admin.declareBinding(new Binding("queue4", Binding.DestinationType.QUEUE, "FanoutExchange","fanoutQueue4",null));
admin.declareBinding(new Binding("queue5", Binding.DestinationType.QUEUE, "TopicExchange","topic.*",null));
admin.declareBinding(new Binding("queue6", Binding.DestinationType.QUEUE, "TopicExchange",".Queue6",null));
}

发送消息

DirectExchange(完全匹配)

1
2
3
4
5
6
@Test
public void sendMessageToDiret(){
HashMap<Object, Object> map = new HashMap<>();
map.put("msg","send a message");
rabbitTemplate.convertAndSend("DirectExchange","directQueue1",map);
}

可以发现这里的消息的序列化方式是默认使用java的序列化方式的。(查看RabbitTemplate类中有个MessageConverter属性,这是用来设置序列化方式的。默认是使用SimpleMessageConverter),如果想要自定义序列化方式。

1
2
3
4
5
6
7
@Configuration
public class MyConfig {
@Bean
public MessageConverter MyMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

FanoutExchange(广播)

1
2
3
4
5
6
@Test
public void sendMessageToFanout(){
HashMap<Object, Object> map = new HashMap<>();
map.put("msg","send a message");
rabbitTemplate.convertAndSend("FanoutExchange","",map);
}

可以发现,与FanoutExchange绑定的Queue都收到了消息

TopicExchange(“#”匹配一个或多个词,符号“*”匹配不多不少一个词)

1
2
3
4
5
6
@Test
public void sendMessageToTopic(){
HashMap<Object, Object> map = new HashMap<>();
map.put("msg","send a message");
rabbitTemplate.convertAndSend("TopicExchange","topic.queue",map);
}

可以发现,topic.queue被topic.*匹配到,故queue5会收到消息。

接收消息

1
2
3
4
5
@Test
public void receiveMessage(){
Object queue1 = rabbitTemplate.receiveAndConvert("queue1");
System.out.println(queue1);
}
1
{msg=send a message}

获得消息之后,队列中的消息就会被消费(删除)

使用@RabbitListener接收消息(需要加上@EnableRabbit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class MyListener {

@RabbitListener(queues = {"queue1","queue2"})
public void receive(Object obj){
// if(obj instanceof Message){
// byte[] body = ((Message) obj).getBody();
// MessageProperties messageProperties = ((Message) obj).getMessageProperties();
// System.out.println(body);
// System.out.println(messageProperties);
// }
System.out.println(obj);
}
}

这样receive方法就会监听queue1和queue2两个队列,一旦有消息进入这两个队列中,就会被receive方法消费掉。


------ 已触及底线感谢您的阅读 ------
麻辣香锅不要辣 微信支付

微信支付

  • 本文作者: 麻辣香锅不要辣
  • 本文链接: https://http://ybhub.gitee.io/2019/08/25/springboot-rabbitmq/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
# SpringBoot # RabbitMQ
springboot-redis
springboot-security
  • 文章目录
  • 站点概览
麻辣香锅不要辣

麻辣香锅不要辣

21 日志
11 分类
20 标签
GitHub 简书
  1. 1. 简介
  2. 2. 核心概念
  3. 3. 环境搭建
    1. 3.1. 引入依赖
  4. 4. 自动配置原理
    1. 4.1. 创建Exchange
    2. 4.2. 创建队列
    3. 4.3. 创建依赖
    4. 4.4. 发送消息
      1. 4.4.1. DirectExchange(完全匹配)
      2. 4.4.2. FanoutExchange(广播)
      3. 4.4.3. TopicExchange(“#”匹配一个或多个词,符号“*”匹配不多不少一个词)
    5. 4.5. 接收消息
    6. 4.6. 使用@RabbitListener接收消息(需要加上@EnableRabbit)
© 2019 – 2020 麻辣香锅不要辣 | 站点总字数: 20.4k字
|
0%