1、rabbitMQ Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
2、代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* rabbitMq 路由类型
* 1、Direct 和 Fanout 两种模式下可以有多个接受者,如果多个接受者,监控同一个队列,那么会轮询将消息发送至接收者
* 2、Fanout 路由模式下,如果将多个队列进行绑定,那么多个队列同时接受到消息,同一个Queue被多个接收者监听,依然轮询接受消息
*/
public class RabbitMQDirect {
public static class MqUtil {
public static String host = "192.168.8.48";
public static Integer hostPort = 5672;
public static String userName = "admin";
public static String password = "admin1234";
public static Channel getConnection(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost(host);
connectionFactory.setPort(hostPort);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
try {
return connectionFactory.newConnection().createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
}
// 生产者
public static void main(String[] args) throws IOException {
Channel connection = MqUtil.getConnection();
Map<String, Object> header = new HashMap<>();
header.put("num", 1); //设置消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().
contentEncoding("utf-8").contentType("application/json").deliveryMode(2).
headers(header).build();
/***
* 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
* 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
* mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
*/
connection.basicPublish(
"exchange_01", // 指定交换机
"routing_02", // 指定路由
false,
basicProperties, // 发布的参数
"发送正常消息1".getBytes()); // 发布的消息
connection.basicPublish(
"exchange_01", // 指定交换机
"routing_02", // 指定路由
false,
basicProperties, // 发布的参数
"发送正常消息2".getBytes()); // 发布的消息
connection.basicPublish(
"exchange_01", // 指定交换机
"routing_02", // 指定路由
false,
basicProperties, // 发布的参数
"发送正常消息3".getBytes()); // 发布的消息
connection.basicPublish(
"exchange_01", // 指定交换机
"routing_02", // 指定路由
false,
basicProperties, // 发布的参数
"发送正常消息4".getBytes()); // 发布的消息
connection.basicPublish(
"exchange_01",
"routing_01",
false,
basicProperties,
"发送正常消息5".getBytes());
}
public static class RabbitMq{
public static void directConsumer1() throws IOException {
Channel channel = MqUtil.getConnection();
// 声明交换机
channel.exchangeDeclare("exchange_01", // 交换机名称
"direct", // 交换机类型
false, // 是否持久化,持久化后,服务器重启依旧存在这个交换机
true, // 是否自动删除,如果是 true,则长时间不用则自动删除
false, // 是否是rabbitmq 内部队列
null);
// 声明队列
channel.queueDeclare("queue_01", false, false, true, null);
// 绑定队列和交换机
channel.queueBind("queue_01", "exchange_01", "routing_01");
//gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
channel.basicConsume("",false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String data = new String(body, Charset.defaultCharset());
System.out.println(data);
System.out.println("routing_01:" + data);
}
});
}
public static void directConsumer2() throws IOException {
Channel channel = MqUtil.getConnection();
channel.exchangeDeclare("exchange_01", "direct", false, true, false, null);
channel.queueDeclare("queue_01", false, false, true, null);
channel.queueBind("queue_01", "exchange_01", "routing_02");
//gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
channel.basicConsume("",false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String data = new String(body, Charset.defaultCharset());
System.out.println("routing_02:" + data);
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
}
// 消费者
public static void main(String[] args) throws IOException {
directConsumer1();
directConsumer1();
directConsumer2();
}
}
}
最后修改于 2020-06-05 14:16:21
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

