<!-- rabbitmq--> RabbitMQ的maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
import com.rabbitmq.client.*;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RabbitConsumer {
private static final Logger LOG = Logger.getLogger(RabbitConsumer.class);
private static final String QUEUE_NAME = "QUEUE_NAME";
private static final String IP_ADDRESS = "10.1.19.2";
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin";
private static final int PORT = 5672;
/**
* 监听 rabbitmq 消息
* @throws Exception
*/
public static void consume() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setVirtualHost("/");
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// 注意编码要和发送段的编码一致性,否则惠出现乱码,这里被坑了,存入数据库乱码
String data = new String(body, Charset.defaultCharset());
fixedThreadPool.execute(new Thread() {
@Override
public void run() {
System.out.println(data);
//TODO 处理接收到的消息 data
}
});
// 消息确认
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
};
//参数:队列名 是否自动应答 消费者
channel.basicConsume(QUEUE_NAME, false, consumer);
}
public static void main(String[] args) throws Exception {
consume();
}
}
最后修改于 2020-05-15 19:12:28
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

