Flin消费kafka数据并清晰,SQL进行统计


import com.cloud.multiple.db.flink.entity.ReportDataAdEntity;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.io.UnsupportedEncodingException;

@Slf4j
public class Test {
    public static void main(String[] args) throws UnsupportedEncodingException {

        // 1、创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2、创建数据源
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("172.16.1.61:9092")
                .setTopics("source.report_data_ad")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 2、获取kafka数据源的 dataStream数据流
        DataStreamSource<String> report_data_ad = env.fromSource(source, WatermarkStrategy.noWatermarks(), "report_data_ad");

        // 3、对数据进行转换
        SingleOutputStreamOperator<ReportDataAdEntity> map = report_data_ad.map(new MapFunction<String, ReportDataAdEntity>() {
            @Override
            public ReportDataAdEntity map(String value) throws Exception {
                ReportDataAdEntity reportDataAdEntity = new ReportDataAdEntity();
                JSONObject object = JSONUtil.parseObj(value);
                BeanUtil.copyProperties(object, reportDataAdEntity);
                return reportDataAdEntity;
            }
        });

        // 4、将数据流转换为 table
        tableEnv.createTemporaryView("report_data_ad", map);

        // 5、使用SQL进行查询 table
        String processSql =
                "  select uuid,uid,city ,pid,os,brand" +
                        " from report_data_ad ";
        CloseableIterator<Row> collect = tableEnv.executeSql(processSql).collect();

        collect.forEachRemaining(e -> {
                    Object uuid = e.getField("uuid");
                    System.out.println(uuid);
                }
        );
    }
}

 

最后修改于 2022-04-25 14:16:02
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇