1、首先引入Jar文件
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
二、代码实现 publish 发布消息、subscribe/psubscribe 订阅消息
package redis;
import redis.clients.jedis.*;
import java.util.HashSet;
public class Subscribe {
/**
* 机器群
*/
public static JedisCluster cluster = new JedisCluster(
new HashSet<HostAndPort>(){{
add(new HostAndPort("192.168.1.4", 7100));
add(new HostAndPort("192.168.1.5", 7101));
add(new HostAndPort("192.168.1.6", 7200));
add(new HostAndPort("192.168.1.7", 7201));
add(new HostAndPort("192.168.1.8", 7300));
add(new HostAndPort("192.168.1.9", 7301));
}});
/**
* 单机
*/
public static JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "ip", 7001);
public static String topicLog = "log.*";
public static String topicLogInfo = "log.info";
public static String topicLogDebug = "log.info.debug";
public static String topicLogError = "log.info.debug.error";
/**
* 精确频道模式-订阅消息
* @param topic
*/
public static void sub(String topic){
new Thread(()->{
cluster.subscribe(new JedisPubSub() {
/**
* 完全匹配-接受消息
* @param channel 管道
* @param message 消息
*/
@Override
public void onMessage(String channel, String message) {
System.out.println("onMessage:" + message);
}
}, topic);
}).start();
}
/**
* 匹配频道模式-订阅消息
* @param topic
*/
public static void pSub(String topic){
new Thread(()->{
// 单机版使用 jedisPool.getResource().psubscribe
cluster.psubscribe(new JedisPubSub() {
/**
* 通配符匹配-接受消息
* @param channel 管道
* @param message 消息
*/
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("onPMessage:" + message);
}
}, topic);
}).start();
}
/**
* 发布消息
* @param topic 管道
* @param message 消息
*/
public static void publish(String topic,String message){
// 单机版使用 jedisPool.getResource().psubscribe
cluster.publish(topic,message);
}
public static void main(String[] args) throws InterruptedException {
sub(topicLog);
pSub(topicLog);
int i = 0;
// 1 秒发送一条消息
while (true){
Thread.sleep(1000);
i ++ ;
if(i == 1){
publish(topicLog,topicLog);
}else if(i == 2){
publish(topicLogDebug,topicLogDebug);
}else if(i == 3){
publish(topicLogInfo,topicLogInfo);
}else if(i == 4){
i = 0;
publish(topicLogError,topicLogError);
}
}
}
}
3、运行打印结果如下:
onPMessage:log.*
onMessage:log.*
onPMessage:log.info.debug
onPMessage:log.info
onPMessage:log.info.debug.error
onPMessage:log.*
onMessage:log.*
onPMessage:log.info.debug
onPMessage:log.info
onPMessage:log.info.debug.error
最后修改于 2020-09-17 14:06:20
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

