<!-- rabbitmq--> RabbitMQ的maven 依赖
     <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>3.5.6</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/log4j/log4j -->
       <dependency>
           <groupId>log4j</groupId>
           <artifactId>log4j</artifactId>
           <version>1.2.17</version>
       </dependency>

 

 

import com.rabbitmq.client.*;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RabbitConsumer {

    private static final Logger LOG = Logger.getLogger(RabbitConsumer.class);

    private static final String QUEUE_NAME = "QUEUE_NAME";
    private static final String IP_ADDRESS = "10.1.19.2";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin";
    private static final int PORT = 5672;

    /**
     * 监听 rabbitmq 消息
     * @throws Exception
     */
    public static void consume() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setVirtualHost("/");
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                // 注意编码要和发送段的编码一致性,否则惠出现乱码,这里被坑了,存入数据库乱码
                String data = new String(body, Charset.defaultCharset());
                fixedThreadPool.execute(new Thread() {
                    @Override
                    public void run() {
                        System.out.println(data);
                        //TODO 处理接收到的消息 data
                    }
                });
                // 消息确认
                try {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        //参数:队列名    是否自动应答      消费者
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
    public static void main(String[] args) throws Exception {
        consume();
    }
}

 

最后修改于 2020-05-15 19:12:28
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇