第一、添加maven依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>

第二部、添加配置

spring:
  rabbitmq:
    host: www.xxxx.cn
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true
          # 出现异常时重试次数
          max-attempts: 2
    template:
      mandatory: true
    connection-timeout: 1000000

第三、code


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Component
@RestController
public class RabbitMqConfig {


    @Resource
    private RabbitAdmin rabbitAdmin;

    /**交换机名称*/
    public static final String ORDER_TO_PRODUCT_EXCHANGE_NAME = "order-to-product.exchange";

    /**队列名称*/
    public static final String ORDER_TO_PRODUCT_QUEUE_NAME = "order-to-product.queue";

    /**路由key*/
    public static final String ORDER_TO_PRODUCT_ROUTING_KEY = "order-to-product.key";

    /**消息重发的最大次数*/
    public static final Integer MSG_RETRY_COUNT = 5;

    public static final Integer TIME_DIFF = 30;

    /** 死信交换机  */
    public static final String dlxExchange =  "dlx.exchange";
    /** 死信队列  */
    public static final String FlxQueue = "dlx.queue";
    /** 死信路由  */
    public static final String failRoutingKey = "#";

    @Resource
    private RabbitTemplate template;

    /**
     * 当正常队列没有监听,没有消费,则会转为死信队列
     */
//    @RabbitListener(queues = ORDER_TO_PRODUCT_QUEUE_NAME)
    public void consumer(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println(msg);
        if("1".equals(msg)){ // 代表业务正常处理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else{ // 代表业务处理异常,则转为私信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
    @RabbitListener(queues = "dlx.queue")
    public void flxQueue(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("私信队列收到消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @GetMapping("send")
    public String send(String msg, String expiration){
        MessageProperties messageProperties = new MessageProperties();
//        messageProperties.setDelay(1000);
        messageProperties.setExpiration(expiration);
        Message message = new Message(msg.getBytes(), messageProperties);
        template.send(ORDER_TO_PRODUCT_EXCHANGE_NAME,ORDER_TO_PRODUCT_ROUTING_KEY,message);
        return msg;
    }


    @Bean
    public DirectExchange orderToProductExchange() {
        DirectExchange directExchange = new DirectExchange(ORDER_TO_PRODUCT_EXCHANGE_NAME,true,false);
        rabbitAdmin.declareExchange(directExchange);
        return directExchange;
    }

    @Bean
    public Queue orderToProductQueue() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", dlxExchange);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", failRoutingKey);
        Queue queue = new Queue(ORDER_TO_PRODUCT_QUEUE_NAME,true,false,false,args);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }


//##======================================================

//     死信交换机
    @Bean
    public DirectExchange orderFailToProductExchange() {
        DirectExchange directExchange = new DirectExchange(dlxExchange,true,false);
        return directExchange;
    }

    //     死信队列
    @Bean
    public Queue orderFailToProductQueue() {
        Queue queue = new Queue(FlxQueue,true,false,false);
        return queue;
    }

    @Bean
    public void orderToProductBinding() {
        rabbitAdmin.declareBinding(BindingBuilder.bind(orderToProductQueue()).to(orderToProductExchange()).with(ORDER_TO_PRODUCT_ROUTING_KEY));
        rabbitAdmin.declareBinding(BindingBuilder.bind(orderFailToProductQueue()).to(orderFailToProductExchange()).with(failRoutingKey));
    }
//    =====================  下面是配置 ====================

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

 

最后修改于 2021-08-15 18:44:12
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇