首先需要引入Java包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
<scope>compile</scope>
</dependency>
<!-- hutool 工具包 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.10</version>
</dependency>
KafkaMsgConsumer消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* kafka 消息生产者
*/
@Slf4j
public class KafkaMsgConsumer {
private static class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {//如果Kafka返回一个错误,onCompletion方法抛出一个non null异常。
e.printStackTrace();//对异常进行一些处理,这里只是简单打印出来
}else{
log.info("发送成功");
}
}
}
public static void main(String[] args) throws InterruptedException {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.31.147:9092");
//设置数据key和value的序列化处理类
kafkaProps.put("key.deserializer", StringDeserializer.class);
kafkaProps.put("value.deserializer", StringDeserializer.class);
//必须指定消费者组
kafkaProps.put("group.id", "test");
KafkaConsumer consumer = new KafkaConsumer<String, String>(kafkaProps);
//订阅topic1的消息
consumer.subscribe(Arrays.asList("flink-kafka-test"));
//到服务器中读取记录
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records){
System.out.println("key:" + record.key() + "" + ",value:" + record.value());
}
}
}
}
kafka 消息生产者
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
/**
* kafka 消息生产者
*/
@Slf4j
public class KafkaMsgProvide {
private static class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {//如果Kafka返回一个错误,onCompletion方法抛出一个non null异常。
e.printStackTrace();//对异常进行一些处理,这里只是简单打印出来
}else{
System.out.println("发送成功");
log.info("发送成功");
}
}
}
public static List<PVDto> buildPvData(){
List<PVDto> pvs= new ArrayList<>();
for(int i = 0; i < 10; i++){
String randomIp = RandomStringUtils.random(1,false,true);
String areaId = RandomStringUtils.random(4,false,true);
String version = RandomStringUtils.random(2,false,true);
int osNum = RandomUtil.randomInt(0,2);
PVDto pv = new PVDto();
pv.setId(Long.valueOf(i));
pv.setIp("102.168.31.10"+ randomIp);
pv.setAreaId(areaId);
pv.setTime(new Date());
pv.setVersion(version);
if (osNum == 0) {
pv.setOs("android");
} else if (osNum == 1) {
pv.setOs("ios");
} else if (osNum == 2) {
pv.setOs("windows");
} else {
pv.setOs("none");
}
pvs.add(pv);
}
return pvs;
}
public static void main(String[] args) throws InterruptedException {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.31.147:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
List<PVDto> pvs = buildPvData();
for(PVDto pv : pvs){
JSON parse = JSONUtil.parse(pv);
String jsonStr = JSONUtil.toJsonStr(pv);
ProducerRecord<String, String> record = new ProducerRecord<String,String>("flink-kafka-test",
"person", jsonStr);//Topic Key Value
producer.send(record, new DemoProducerCallback());//发送消息时,传递一个回调对象,该回调对象必须实现org.apahce.kafka.clients.producer.Callback接口
}
Thread.sleep(1000000);
}
}
最后修改于 2021-10-20 17:16:49
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

