[jira] [Created] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

Shang Yuanchun (Jira)
Piotr Nowojski created FLINK-23011:
--------------------------------------

             Summary: FLIP-27 sources are generating non-deterministic results when using event time
                 Key: FLINK-23011
                 URL: https://issues.apache.org/jira/browse/FLINK-23011
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream
    Affects Versions: 1.12.4, 1.13.1, 1.14.0
         Environment:

            Reporter: Piotr Nowojski


FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this happens, downstream operators are ignoring {{IDLE}} inputs from calculating the input (min) watermark.

An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason:
{code:java}
env.getConfig().setAutoWatermarkInterval(2000);
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10));

DataStream<Long> eventStream =
        env.fromSource(
                        new NumberSequenceSource(0, Long.MAX_VALUE),
                        WatermarkStrategy.<Long>forMonotonousTimestamps()
                                .withTimestampAssigner(new LongTimestampAssigner()),
                        "NumberSequenceSource")
                .map(
                        new RichMapFunction<Long, Long>() {
                            @Override
                            public Long map(Long value) throws Exception {
                                if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                    Thread.sleep(1);
                                }
                                return 1L;
                            }
                        });

eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();

(...)
private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> {
    private long counter = 0;

    @Override
    public long extractTimestamp(Long record, long recordTimeStamp) {
        return counter++;
    }
}
{code}
In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream {{WindowOperator}} first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole {{WindowOperator}}. When the input channel from the throttled source subtask finally receives it's {{ACTIVE}} status and a much lower watermark, that's already too late.

Actual output of the example program:
{noformat}
1596
2000
1000
1000
1000
1000
1000
1000
(...)
{noformat}
while the expected output should be always "2000" (2000 records fitting in every 1 second global window)
{noformat}
2000
2000
2000
2000
(...)
{noformat}.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)