第一步maven依赖

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

spring-boot版本

   <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
    </parent>

第一个配置类:RabbitConfig


import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

第二个消费者监听


import cn.hutool.json.JSONObject;
import com.csdcb.link.common.constants.RabbitMqConstant;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component

public class SimpleConsumer {

    // //消费者如果监听到消息队列有消息传入,则会自动消费
    @RabbitHandler
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(value = RabbitMqConstant.QUEUE_VISIT_RECORD, durable = "true", ignoreDeclarationExceptions = "true"),
                            exchange = @Exchange(name = RabbitMqConstant.QUEUE_VISIT_RECORD,type = ExchangeTypes.TOPIC),
                            key = RabbitMqConstant.ROUTING_KEY_VISIT_RECORD
                    )
            })  //如果simple_queue队列不存在,则创建simple_queue队列。默认队列是持久化,非独占式的
    public void handler(JSONObject message) {
        System.out.println("消费者接收到的消息是:" + message);
    }
}

生产者


import cn.hutool.json.JSONObject;
import com.csdcb.link.common.constants.RabbitMqConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        for (int i = 0; i < 5; i++) {
            String msg = "简单模式下的消息 " + i;
            JSONObject jsonObject = new JSONObject();
            jsonObject.set("msg", msg);
            rabbitTemplate.convertAndSend(RabbitMqConstant.EXCHANGE_TOPIC_VISIT_RECORD,
                    RabbitMqConstant.ROUTING_KEY_VISIT_RECORD, jsonObject);
        }
    }
}

来个controller调用一下


import com.csdcb.link.common.annotation.PassToken;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/simple")
public class SimpleController {

    @Autowired
    private SimpleProducer simpleProducer;

    @PassToken
    @RequestMapping("/sendMsg")
    public String sendMsg() {
        simpleProducer.sendMsg();
        return "SIMPLE-QUEUE";
    }
}

 

最后修改于 2023-10-18 21:37:46
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇