Piotr Nowojski created FLINK-23011:
-------------------------------------- Summary: FLIP-27 sources are generating non-deterministic results when using event time Key: FLINK-23011 URL: https://issues.apache.org/jira/browse/FLINK-23011 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 1.12.4, 1.13.1, 1.14.0 Environment: Reporter: Piotr Nowojski FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this happens, downstream operators are ignoring {{IDLE}} inputs from calculating the input (min) watermark. An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason: {code:java} env.getConfig().setAutoWatermarkInterval(2000); env.setParallelism(2); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10)); DataStream<Long> eventStream = env.fromSource( new NumberSequenceSource(0, Long.MAX_VALUE), WatermarkStrategy.<Long>forMonotonousTimestamps() .withTimestampAssigner(new LongTimestampAssigner()), "NumberSequenceSource") .map( new RichMapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { if (getRuntimeContext().getIndexOfThisSubtask() == 0) { Thread.sleep(1); } return 1L; } }); eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print(); (...) private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> { private long counter = 0; @Override public long extractTimestamp(Long record, long recordTimeStamp) { return counter++; } } {code} In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream {{WindowOperator}} first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole {{WindowOperator}}. When the input channel from the throttled source subtask finally receives it's {{ACTIVE}} status and a much lower watermark, that's already too late. Actual output of the example program: {noformat} 1596 2000 1000 1000 1000 1000 1000 1000 (...) {noformat} while the expected output should be always "2000" (2000 records fitting in every 1 second global window) {noformat} 2000 2000 2000 2000 (...) {noformat}. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |