1、rabbitMQ Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
2、代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* rabbitMq topic 交换机类型
*/
public class RabbitMQTopic {
public static class MqUtil {
public static String host = "192.168.8.48";
public static Integer hostPort = 5672;
public static String userName = "admin";
public static String password = "admin1234";
public static Channel getConnection(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost(host);
connectionFactory.setPort(hostPort);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
try {
return connectionFactory.newConnection().createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
}
// 生产者
public static void main(String[] args) throws IOException {
Channel connection = MqUtil.getConnection();
Map<String, Object> header = new HashMap<>();
header.put("num", 1); //设置消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().
contentEncoding("utf-8").contentType("application/json").deliveryMode(2).
headers(header).build();
connection.basicPublish(
"exchange_topic_01", // 指定交换机
"goods.add", // 指定路由
false,
basicProperties, // 发布的参数
"发送正常消息1".getBytes()); // 发布的消息
}
public static class RabbitMq{
public static void topicConsumer1() throws IOException {
String queueName = "topic_01";
String exchange = "exchange_topic_01";
Channel channel = MqUtil.getConnection();
// 声明交换机
channel.exchangeDeclare(exchange, // 交换机名称
"topic", // 交换机类型
true, // 是否持久化,持久化后,服务器重启依旧存在这个交换机
false, // 是否自动删除,如果是 true,则长时间不用则自动删除
false, // 是否是rabbitmq 内部队列
null);
// 声明队列
channel.basicQos(1);
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列和交换机
channel.queueBind(queueName, exchange, "goods.*");
//gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
channel.basicConsume(queueName,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String data = new String(body, Charset.defaultCharset());
System.out.println(data);
System.out.println("routing_01-queue_01:" + data);
//回应消息设置一读,如果不标识为一读,下次启动程序还会读取到此消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
}
public static void topicConsumer2() throws IOException {
Channel channel = MqUtil.getConnection();
String queueName = "topic_02";
String exchange = "exchange_topic_01";
channel.exchangeDeclare(exchange, "topic", true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchange, "goods.add");
//gloabl设置为ture 那么就是channel级别的限流,若为false 就是consumer级别的限制流量 //
channel.basicConsume(queueName,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String data = new String(body, Charset.defaultCharset());
System.out.println("routing_02-topic_02:" + data);
//回应消息设置一读,如果不标识为一读,下次启动程序还会读取到此消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
}
// 消费者
public static void main(String[] args) throws IOException {
topicConsumer1();
topicConsumer2();
}
}
}
最后修改于 2020-06-05 15:05:49
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

