使用侧边数据输出流输出严重迟到的数据
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
/**
统计每5m中 key 出现的次数
执行步骤
nc -lp 9999
输入一下字符
a,1
a,2
a,8
a,3
a,10
a,2
a,4
a,5
*/
public class WindowSideOutPutStreamDataLate {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(1);
// 用户存储严重迟到的数据
OutputTag<Tuple2<String,Long>> lateDataTag = new OutputTag<>("seriousData", Types.TUPLE(Types.STRING, Types.LONG));
ArrayList<String> objects = new ArrayList<>();
objects.add("a,1");
objects.add("a,2");
objects.add("a,8");
objects.add("a,3");
objects.add("a,10");
objects.add("a,2");
objects.add("a,4");
objects.add("a,5");
// 不可使用这种内存准备好的数据,进行模拟迟到数据输出到侧边数据流中,
// DataStreamSource<String> source = env.fromCollection(objects);
// 可以使用手动输入数据的方式
// DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// 获取创建 数据源函数,模拟数据断断续续延迟接受的过程
DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
private Boolean cancel = false;
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (String s : objects) {
if (cancel) {
break;
}
ctx.collect(s);
Thread.sleep(700);
}
}
@Override
public void cancel() {
cancel = true;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> streamMap = source.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return Tuple2.of(split[0], Long.parseLong(split[1]));
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarks = streamMap.assignTimestampsAndWatermarks(
//允许最大边界,也就是窗口时间 加上 forBoundedOutOfOrderness 这个时间 3s 也就是在8秒的时候会触发一次计算
// 计算窗口内的数据,也就是下面设置 TumblingEventTimeWindows.of(Time.seconds(5)) 5s内的数据,左闭右开 也就是小于 5秒的数据进行计算
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1 * 1000L;
}
})
);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = assignTimestampsAndWatermarks.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
// 滚动时间窗口为5s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 允许最大延迟时间 2s
.allowedLateness(Time.seconds(2))
// fixme 添加严重迟到的数据到侧输出流中
.sideOutputLateData(lateDataTag)
// process处理,对窗口的数据进行统计,生产【单词,出现次数】并将窗口开始时间和结束时间打印出来
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
int size = 0;
for (Tuple2<String, Long> element : elements) {
size++;
}
out.collect(Tuple2.of(s, size));
System.out.println("当前窗口开始时间: " + context.window().getStart() + " 窗口结束时间:" + context.window().getEnd());
}
});
// 正常数据输出
result.print();
// 打印出严重迟到的数据
result.getSideOutput(lateDataTag).printToErr("serious data :");
env.execute();
}
}
打印结果如下
(a,2)
当前窗口开始时间: 0 窗口结束时间:5000
(a,3)
当前窗口开始时间: 0 窗口结束时间:5000
serious data :> (a,2)
serious data :> (a,4)
(a,2)
当前窗口开始时间: 5000 窗口结束时间:10000
(a,1)
当前窗口开始时间: 10000 窗口结束时间:15000
最后修改于 2022-10-08 16:58:33
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

