1、rabbitMQ Maven 依赖

     <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>3.5.6</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/log4j/log4j -->
       <dependency>
           <groupId>log4j</groupId>
           <artifactId>log4j</artifactId>
           <version>1.2.17</version>
       </dependency>

2、代码


import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
 * rabbitMq 路由类型
 * 1、Direct 和 Fanout 两种模式下可以有多个接受者,如果多个接受者,监控同一个队列,那么会轮询将消息发送至接收者
 * 2、Fanout 路由模式下,如果将多个队列进行绑定,那么多个队列同时接受到消息,同一个Queue被多个接收者监听,依然轮询接受消息
 */
public class RabbitMQDirect {

    public static class MqUtil {

        public static String host = "192.168.8.48";
        public static Integer hostPort = 5672;
        public static String userName = "admin";
        public static String password = "admin1234";

        public static Channel getConnection(){
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHost(host);
            connectionFactory.setPort(hostPort);
            connectionFactory.setUsername(userName);
            connectionFactory.setPassword(password);
            try {
                return connectionFactory.newConnection().createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    // 生产者
    public static void main(String[] args) throws IOException {
        Channel connection = MqUtil.getConnection();

        Map<String, Object> header = new HashMap<>();
        header.put("num", 1); //设置消息属性

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().
                contentEncoding("utf-8").contentType("application/json").deliveryMode(2).
                headers(header).build();
        /***
         * 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
         * 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
         * mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
         */
        connection.basicPublish(
                "exchange_01", // 指定交换机
                "routing_02", // 指定路由
                false,
                basicProperties,        // 发布的参数
                "发送正常消息1".getBytes()); // 发布的消息
        connection.basicPublish(
                "exchange_01", // 指定交换机
                "routing_02", // 指定路由
                false,
                basicProperties,        // 发布的参数
                "发送正常消息2".getBytes()); // 发布的消息
        connection.basicPublish(
                "exchange_01", // 指定交换机
                "routing_02", // 指定路由
                false,
                basicProperties,        // 发布的参数
                "发送正常消息3".getBytes()); // 发布的消息
        connection.basicPublish(
                "exchange_01", // 指定交换机
                "routing_02", // 指定路由
                false,
                basicProperties,        // 发布的参数
                "发送正常消息4".getBytes()); // 发布的消息
        connection.basicPublish(
                "exchange_01",
                "routing_01",
                false,
                basicProperties,
                "发送正常消息5".getBytes());
    }
    public static class RabbitMq{


        public static void directConsumer1() throws IOException {
            Channel channel = MqUtil.getConnection();
            // 声明交换机
            channel.exchangeDeclare("exchange_01", // 交换机名称
                    "direct", // 交换机类型
                    false,  // 是否持久化,持久化后,服务器重启依旧存在这个交换机
                    true, // 是否自动删除,如果是 true,则长时间不用则自动删除
                    false,  // 是否是rabbitmq 内部队列
                    null);
            // 声明队列
            channel.queueDeclare("queue_01", false, false, true, null);
            // 绑定队列和交换机
            channel.queueBind("queue_01", "exchange_01", "routing_01");
            //gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
            channel.basicConsume("",false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String data = new String(body, Charset.defaultCharset());
                    System.out.println(data);
                    System.out.println("routing_01:" + data);
                }
            });
        }
        public static void directConsumer2() throws IOException {
            Channel channel = MqUtil.getConnection();
            channel.exchangeDeclare("exchange_01", "direct", false, true, false, null);
            channel.queueDeclare("queue_01", false, false, true, null);
            channel.queueBind("queue_01", "exchange_01", "routing_02");

            //gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
            channel.basicConsume("",false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String data = new String(body, Charset.defaultCharset());
                    System.out.println("routing_02:" + data);
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            });
        }
        // 消费者
        public static void main(String[] args) throws IOException {
            directConsumer1();
            directConsumer1();
            directConsumer2();
        }
    }
}

 

最后修改于 2020-06-05 14:16:21
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇