使用侧边数据输出流输出严重迟到的数据


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;

/**
 统计每5m中 key 出现的次数
 执行步骤
 nc -lp 9999
 输入一下字符
 a,1
 a,2
 a,8
 a,3
 a,10
 a,2
 a,4
 a,5
 */
public class WindowSideOutPutStreamDataLate {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(1);
        // 用户存储严重迟到的数据
        OutputTag<Tuple2<String,Long>> lateDataTag = new OutputTag<>("seriousData", Types.TUPLE(Types.STRING, Types.LONG));

        ArrayList<String> objects = new ArrayList<>();
        objects.add("a,1");
        objects.add("a,2");
        objects.add("a,8");
        objects.add("a,3");
        objects.add("a,10");
        objects.add("a,2");
        objects.add("a,4");
        objects.add("a,5");


//        不可使用这种内存准备好的数据,进行模拟迟到数据输出到侧边数据流中,
//        DataStreamSource<String> source = env.fromCollection(objects);
//        可以使用手动输入数据的方式
//        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
//        获取创建 数据源函数,模拟数据断断续续延迟接受的过程
        DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
            private Boolean cancel = false;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                for (String s : objects) {
                    if (cancel) {
                        break;
                    }
                    ctx.collect(s);
                    Thread.sleep(700);
                }
            }
            @Override
            public void cancel() {
                cancel = true;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> streamMap = source.map(new MapFunction<String, Tuple2<String, Long>>() {

            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] split = value.split(",");
                return Tuple2.of(split[0], Long.parseLong(split[1]));
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarks = streamMap.assignTimestampsAndWatermarks(
                //允许最大边界,也就是窗口时间 加上 forBoundedOutOfOrderness 这个时间 3s 也就是在8秒的时候会触发一次计算
                // 计算窗口内的数据,也就是下面设置 TumblingEventTimeWindows.of(Time.seconds(5)) 5s内的数据,左闭右开 也就是小于 5秒的数据进行计算
                WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                return element.f1 * 1000L;
                            }
                        })
        );

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = assignTimestampsAndWatermarks.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 滚动时间窗口为5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 允许最大延迟时间 2s
                .allowedLateness(Time.seconds(2))
                // fixme 添加严重迟到的数据到侧输出流中
                .sideOutputLateData(lateDataTag)
                // process处理,对窗口的数据进行统计,生产【单词,出现次数】并将窗口开始时间和结束时间打印出来
                .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
                        int size = 0;
                        for (Tuple2<String, Long> element : elements) {
                            size++;
                        }
                        out.collect(Tuple2.of(s, size));
                        System.out.println("当前窗口开始时间: " + context.window().getStart() + " 窗口结束时间:" + context.window().getEnd());
                    }
                });

        // 正常数据输出
        result.print();
        // 打印出严重迟到的数据
        result.getSideOutput(lateDataTag).printToErr("serious data :");
        env.execute();

    }
}

打印结果如下

(a,2)
当前窗口开始时间: 0 窗口结束时间:5000
(a,3)
当前窗口开始时间: 0 窗口结束时间:5000
serious data :> (a,2)
serious data :> (a,4)
(a,2)
当前窗口开始时间: 5000 窗口结束时间:10000
(a,1)
当前窗口开始时间: 10000 窗口结束时间:15000

最后修改于 2022-10-08 16:58:33
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇