第一、添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
第二部、添加配置
spring:
rabbitmq:
host: www.xxxx.cn
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
# 出现异常时重试次数
max-attempts: 2
template:
mandatory: true
connection-timeout: 1000000
第三、code
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Component
@RestController
public class RabbitMqConfig {
@Resource
private RabbitAdmin rabbitAdmin;
/**交换机名称*/
public static final String ORDER_TO_PRODUCT_EXCHANGE_NAME = "order-to-product.exchange";
/**队列名称*/
public static final String ORDER_TO_PRODUCT_QUEUE_NAME = "order-to-product.queue";
/**路由key*/
public static final String ORDER_TO_PRODUCT_ROUTING_KEY = "order-to-product.key";
/**消息重发的最大次数*/
public static final Integer MSG_RETRY_COUNT = 5;
public static final Integer TIME_DIFF = 30;
/** 死信交换机 */
public static final String dlxExchange = "dlx.exchange";
/** 死信队列 */
public static final String FlxQueue = "dlx.queue";
/** 死信路由 */
public static final String failRoutingKey = "#";
@Resource
private RabbitTemplate template;
/**
* 当正常队列没有监听,没有消费,则会转为死信队列
*/
// @RabbitListener(queues = ORDER_TO_PRODUCT_QUEUE_NAME)
public void consumer(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println(msg);
if("1".equals(msg)){ // 代表业务正常处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}else{ // 代表业务处理异常,则转为私信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
@RabbitListener(queues = "dlx.queue")
public void flxQueue(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("私信队列收到消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@GetMapping("send")
public String send(String msg, String expiration){
MessageProperties messageProperties = new MessageProperties();
// messageProperties.setDelay(1000);
messageProperties.setExpiration(expiration);
Message message = new Message(msg.getBytes(), messageProperties);
template.send(ORDER_TO_PRODUCT_EXCHANGE_NAME,ORDER_TO_PRODUCT_ROUTING_KEY,message);
return msg;
}
@Bean
public DirectExchange orderToProductExchange() {
DirectExchange directExchange = new DirectExchange(ORDER_TO_PRODUCT_EXCHANGE_NAME,true,false);
rabbitAdmin.declareExchange(directExchange);
return directExchange;
}
@Bean
public Queue orderToProductQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", dlxExchange);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", failRoutingKey);
Queue queue = new Queue(ORDER_TO_PRODUCT_QUEUE_NAME,true,false,false,args);
rabbitAdmin.declareQueue(queue);
return queue;
}
//##======================================================
// 死信交换机
@Bean
public DirectExchange orderFailToProductExchange() {
DirectExchange directExchange = new DirectExchange(dlxExchange,true,false);
return directExchange;
}
// 死信队列
@Bean
public Queue orderFailToProductQueue() {
Queue queue = new Queue(FlxQueue,true,false,false);
return queue;
}
@Bean
public void orderToProductBinding() {
rabbitAdmin.declareBinding(BindingBuilder.bind(orderToProductQueue()).to(orderToProductExchange()).with(ORDER_TO_PRODUCT_ROUTING_KEY));
rabbitAdmin.declareBinding(BindingBuilder.bind(orderFailToProductQueue()).to(orderFailToProductExchange()).with(failRoutingKey));
}
// ===================== 下面是配置 ====================
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
最后修改于 2021-08-15 18:44:12
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

