Flin消费kafka数据并清晰,SQL进行统计
import com.cloud.multiple.db.flink.entity.ReportDataAdEntity;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.io.UnsupportedEncodingException;
@Slf4j
public class Test {
public static void main(String[] args) throws UnsupportedEncodingException {
// 1、创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、创建数据源
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("172.16.1.61:9092")
.setTopics("source.report_data_ad")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 2、获取kafka数据源的 dataStream数据流
DataStreamSource<String> report_data_ad = env.fromSource(source, WatermarkStrategy.noWatermarks(), "report_data_ad");
// 3、对数据进行转换
SingleOutputStreamOperator<ReportDataAdEntity> map = report_data_ad.map(new MapFunction<String, ReportDataAdEntity>() {
@Override
public ReportDataAdEntity map(String value) throws Exception {
ReportDataAdEntity reportDataAdEntity = new ReportDataAdEntity();
JSONObject object = JSONUtil.parseObj(value);
BeanUtil.copyProperties(object, reportDataAdEntity);
return reportDataAdEntity;
}
});
// 4、将数据流转换为 table
tableEnv.createTemporaryView("report_data_ad", map);
// 5、使用SQL进行查询 table
String processSql =
" select uuid,uid,city ,pid,os,brand" +
" from report_data_ad ";
CloseableIterator<Row> collect = tableEnv.executeSql(processSql).collect();
collect.forEachRemaining(e -> {
Object uuid = e.getField("uuid");
System.out.println(uuid);
}
);
}
}
最后修改于 2022-04-25 14:16:02
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

