Sergii Mikhtoniuk created FLINK-18053:
----------------------------------------- Summary: Savepoints do not preserve watermarks Key: FLINK-18053 URL: https://issues.apache.org/jira/browse/FLINK-18053 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Table SQL / Runtime Affects Versions: 1.10.1 Reporter: Sergii Mikhtoniuk Attachments: 1.csv, 2.csv, MyApp.scala Flink produces invalid result when streaming SQL aggregation is stopped and resumed from a savepoint. *Steps to reproduce:* 1) Create an assembly from the attached file. This job will be reading CSV files as a stream. Files contain fake stock tickers which will be aggregated with following tumbling window query: {code:java} SELECT TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time, symbol as symbol, min(price) as `min`, max(price) as `max` FROM Tickers GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol {code} Stream uses punctuated watermarks with max lateness of 1 day 2) Create two CSV files with fake stock tickers: {{1.csv}}: {code:java} 2000-01-01 01:00:00.0,A,10 2000-01-01 01:00:00.0,B,20 2000-01-01 02:00:00.0,A,10 2000-01-01 02:00:00.0,B,21 2000-01-02 01:00:00.0,A,12 2000-01-02 01:00:00.0,B,22 2000-01-02 02:00:00.0,A,13 2000-01-02 02:00:00.0,B,23 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark 2000-01-03 01:00:00.0,A,14 2000-01-03 01:00:00.0,B,24 2000-01-03 02:00:00.0,A,15 2000-01-03 02:00:00.0,B,25 {code} {{2.csv}}: {code:java} 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they only push watermark up 2000-01-04 01:00:00.0,B,26 2000-01-04 02:00:00.0,A,17 2000-01-04 02:00:00.0,B,27 2000-01-05 01:00:00.0,A,18 2000-01-05 01:00:00.0,B,28 {code} 3) Run the job on the folder containing both files. Observed result is as expected: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} 4) Now run the job with only {{1.csv}} in the directory. Produces still correct: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 {code} 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job from savepoint. Produces incorrect result: {code:java} 2000-01-01,A,12,12 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} *Expectation:* We were not supposed to see {{20[^MyApp.scala][^1.csv]00-01-01,A,12,12}} record, as it should not have passed the watermark check. This tells me that Flink did not save the watermark in the savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |