# SpringBootRabbitMqDemo **Repository Path**: bladeandmaster/SpringBootRabbitMqDemo ## Basic Information - **Project Name**: SpringBootRabbitMqDemo - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-05-05 - **Last Updated**: 2022-08-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # SpringBoot集成rabbitmq ## rabbitmq官方文档 [链接](https://www.rabbitmq.com/) ## mac安装rabbitmq ### 安装 brew install rabbitmq ### 进入安装目录 cd /usr/local/Cellar/rabbitmq/3.8.3 ### 启动 brew services start rabbitmq ### 当前窗口启动 sbin/rabbitmq-server 然后另开终端,启动rabbitMQ的管理插件: sbin/rabbitmq-plugins enable rabbitmq_management [rabbitMQ管理界面](http://localhost:15672) 默认账号密码是:guest rabbitMQ安装路径: /usr/local/Cellar/rabbitmq/3.8.3 ## SpringBoot集成rabbitmq [SpringBoot集成rabbitmq](https://www.cnblogs.com/sw008/p/11054293.html) ### rabbitmq 配置参数 ``` spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password = guest spring.rabbitmq.virtual-host = / #确认消息已发送到交换机(Exchange) spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=correlated #确认消息已发送到队列(Queue) #spring.rabbitmq.publisher-returns=true (springboot2.2.0以前的版本) spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.direct.acknowledge-mode = manual spring.rabbitmq.listener.simple.acknowledge-mode = manual spring.rabbitmq.listener.simple.concurrency = 1 spring.rabbitmq.listener.simple.max-concurrency = 10 spring.rabbitmq.listener.simple.prefetch = 1 ``` ### @RabbitListener @RabbitListener(queues = "pay_notify_queue") 需要先创建好队列 @RabbitListener(queuesToDeclare = @Queue("pay_notify_queue")) 可以自动创建队列 下面这种既可以自动创建队列,还能自动创建交换机(direct类型)以及队列和交换机的绑定关系 ```java @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "pay_notify_queue", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "pay_notify_exchange"), key = "pay_notify_routing"),concurrency = "2") ``` 如果@RabbitListener 和application.properties都配置了相同的属性,则以@RabbitListener上的属性为准 ### messageConverter 配置Jackson2JsonMessageConverter是为了生产者和消费者都能直接使用Java对象 ```java /** * 将对象以json的方式发送出去,存储在队列中; * 如果生产者发送的是json字符串(存储在队列中的是字符串),监听者可以用String、Message(org.springframework.amqp.core.Message)接收,不能用Java对象接收; * 如果生产者发送的的是java对象(MessageConverter会将对象转成json之后,再发送到mq,存储在队列中的是json),监听者可以用Message接收也可以直接用Java对象接收,不能用String接收; * 若生产者配置、监听者不配置,如果生产者发送的是json字符串(存储在队列中的是字符串,content_type:application/json),监听者只能用Message接收 * 若生产者配置、监听者不配置,如果生产者发送的是java对象(存储在队列中的是json,content_type:application/json),监听者可以用String、Message(org.springframework.amqp.core.Message)接收,不能用Java对象接收; * 若生产者不配置、监听者配置,如果生产者发送的是json字符串(存储在队列中的是字符串,content_type:text/plain),监听者可以用String、Message,不能用Java对象接收; * 若生产者不配置、监听者配置,如果生产者发送的是java对象(存储在队列中的是json,content_type:application/x-java-serialized-object),监听者只能用Message接收,而且乱码; * 若生产者与监听者都不配置,如果生产者发送的是json字符串,存储的content_type:text/plain,监听者可以用String、Message,不能用Java对象接收; * 若生产者与监听者都不配置,如果生产者发送的是java对象,在rabbitmq web端看到队列中存储的是一串序列化后的乱码(java默认的序列化机制),content_type:application/x-java-serialized-object, * 监听者只能用Java对象接收,Message接收乱码,String接收直接报错; * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } ``` ### 监听者指定containerFactory 配置simpleRabbitListenerContainerFactory,这个ListenerContainerFactory设置了Jackson2JsonMessageConverter。 在@RabbitListener指定containerFactory为simpleRabbitListenerContainerFactory,则消费者可以自动将json字符串转化为Java对象; 如果在@RabbitListener未指定containerFactory,则使用默认的rabbitListenerContainerFactory,rabbitListenerContainerFactory依赖配置的messageConverter。 ```java @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "pay_query_queue", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "pay_query_exchange",type = ExchangeTypes.TOPIC), key = "pay_query_routing"),containerFactory = "simpleRabbitListenerContainerFactory") @Bean("simpleRabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( @Qualifier("connectionFactory")ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } ``` ### 自动ack,不能调用basicAck 自定义配置的simpleRabbitListenerContainerFactory,没有设置手动提交,如果消费者使用了这个simpleRabbitListenerContainerFactory,不能调用basicAck。 springboot自带的配置类配置的rabbitListenerContainerFactory,依赖spring.rabbitmq.listener.simple.acknowledge-mode = manual, 为手动提交,如果消费者使用了这个rabbitListenerContainerFactory,需要手动调用basicAck。 ### 通过@Bean创建exchange、queue 通过@Bean创建的exchange、queue只会创建在加了@Primary或者beanName="connectionFactory"的ConnectionFactory下, 因为在rabbitmq的自动配置源码里(RabbitAnnotationDrivenConfiguration)会优先获取加了@Primary或者beanName="connectionFactory"的的ConnectionFactory。 ```java @Bean public TopicExchange exchange() { return new TopicExchange(QueueEnum.QUEUE_PAY_QUERY.getExchange(), true, false); } ``` ## 四种类型Exchange ### Direct Exchange 将消息中的routing key与该exchange所绑定的所有routing key进行比较,如果相等,则发送到对应的队列。 一个Exchange可以绑定多个queue,routing key可以相同也可以不同。生产者发生消息指定exchange和routing key进行发送, 在相同的exchange下,如果一个routing key绑定了多个队列,那么消息会路由到该routing key绑定的所有队列中。 [参考链接](https://www.jianshu.com/p/04f443dcd8bd) 同一个exchange,同一个queue,对应不同的客户端时,是轮询分发 ### Topic Exchange 将消息中的routing key与该exchange所绑定的所有routing key进行比较,如果匹配上,则发送到对应的队列。 匹配规则: * 匹配一个单词,# 匹配0个或多个字符;*和# 只能写在.号左边或者右边(*.log.#),且不能挨着字符;单词和单词之间需要用.隔开。 如果routing key不含*、#,表示完全匹配,那Topic Exchange就和Direct Exchange没什么区别。 ## Fanout Exchange 将消息转发到该echange所绑定的所有队列中,这种exchange在路由转发的时候,忽略routing key。 Fanout Exchange效率最高,fanout > direct > topic ### Headers Exchange ## RabbitMQ如何保证消息顺序性 [链接](https://juejin.cn/post/6992581012282671112) ## 遇到的坑 ### unknown exchange type 'x-delayed-message' springboot2.x集成RabbitMQ实现延迟队列报错unknown exchange type 'x-delayed-message',那是因为rabbitmq没有安装rabbitmq_delayed_message_exchange插件。 https://blog.csdn.net/qq_36850813/article/details/103279548 下载rabbitmq_delayed_message_exchange插件: https://www.rabbitmq.com/community-plugins.html 列出所有插件: sbin/rabbitmq-plugins list 开启插件: sbin/rabbitmq-plugins enable rabbitmq_delayed_message_exchange ### springboot启动报错 监听器报错,Failed to declare queue: pay_notify_queue org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 报错原因:no queue 'hpay_p_notify_queue' in vhost '/' ## spring-rabbit源码分析 spring-rabbit消息默认是持久化的 org.springframework.amqp.core.MessageProperties: MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; ### RabbitMQ消费消息的两种模式:推和拉 [链接](https://blog.csdn.net/ITWANGBOIT/article/details/105428281) rabbitmq消费消息默认是推模式,要想实现高吞吐量,消费者需要使用推模式。 ### Rabbitmq可靠消息投递,消息确认机制 [链接](https://cloud.tencent.com/developer/article/1779992) [链接](https://blog.csdn.net/qq_29595463/article/details/107860704) rabbitmq replyCode=312 replyText=NO_ROUTE exchange->queue失败 spring-boot-starter-amqp工程: https://github.com/spring-projects/spring-boot/tree/v2.2.2.RELEASE/spring-boot-project/spring-boot-starters/spring-boot-starter-amqp RabbitTemplate、RabbitListenerEndpointRegistry实现了Lifecycle接口 DefaultLifecycleProcessor#onRefresh##startBeans Map lifecycleBeans = this.getLifecycleBeans(); 获取实现了Lifecycle接口的bean DefaultLifecycleProcessor.LifecycleGroup#start DefaultLifecycleProcessor#doStart RabbitListenerEndpointRegistry#start##startIfNecessary SimpleMessageListenerContainer#start##doStart SimpleMessageListenerContainer$AsyncMessageProcessingConsumer#run##initialize BlockingQueueConsumer#start##passiveDeclarations###attemptPassiveDeclarations this.channel.queueDeclarePassive(queueName) 代理调用,被CachingConnectionFactory$CachedChannelInvocationHandler的invoke方法拦截到 AutorecoveringChannel->ChannelN->com.rabbitmq.client.Channel RabbitListenerAnnotationBeanPostProcessor