Simon Su created FLINK-13492:
-------------------------------- Summary: BoundedOutOfOrderTimestamps cause Watermark's timestamp leak Key: FLINK-13492 URL: https://issues.apache.org/jira/browse/FLINK-13492 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.9.0 Reporter: Simon Su Attachments: Watermark_timestamp_leak.diff {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf); // Use eventtime, default autoWatermarkInterval is 200ms env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Kafka kafka = new Kafka() .version("0.11") .topic(topic) .startFromLatest() .properties(properties); Schema schema = new Schema(); for (int i = 0; i < names.length; i++) { if ("timestamp".equalsIgnoreCase(names[i])) { // set latency to 1000ms schema.field("rowtime", types[i]).rowtime(new Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(1000)); } else { schema.field(names[i], types[i]); } /** ..... */ tableEnv .connect(kafka) .withFormat(new Protobuf().protobufName("order_sink")) .withSchema(schema) .inAppendMode() .registerTableSource("orderStream");{code} Register up stream table, then use a 10s Tumble window on this table, we input a sequence of normal data, but there is not result output. Then we start to debug to see if the watermark is normally emitted, finally we found the issue. # maxTimestamp will be initialized in BoundedOutOfOrderTimestamps to Long.MIN_VALUE. # nextTimestamp method will extract timestamp from source and set to maxTimestamp. # getWatermark() method will calculate the watermark's timestamp based on maxTimestamp and delay. When +{color:#205081}TimestampsAndPeriodicWatermarksOperator{color}+ {color:#333333}initialize and call open method, it will start to register a SystemTimeService to generate watermark based on watermarkInterval, so that's the problem, the thread initialize and call BoundedOutOfOrderTimestamps${color}getCurrentWatermark, it will cause a Long Value leak. {color:#d04437}(Long.MIN_VALUE - delay). which cause all of the watermark will be dropped because apparently there are less then ( Long.MIN_VALUE - delay ). {color} {color:#d04437}A workaround is to set a large autoWatermarkInterval to make SystemTimeService Thread a long start delay.{color} {code:java} public void onProcessingTime(long timestamp) throws Exception { ... getProcessingTimeService().registerTimer(now + watermarkInterval, this); ... } {code} {code:java} public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { ... long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; ... } {code} {color:#d04437} {color} {color:#d04437}Actually, I think we can fix it by add the delay in BoundedOutOfOrderTimestamps's constructor which can avoid the calculation leak ...{color} {color:#d04437} {color} -- This message was sent by Atlassian JIRA (v7.6.14#76016) |
Free forum by Nabble | Edit this page |