buom created FLINK-22551:
---------------------------- Summary: checkpoints: strange behaviour Key: FLINK-22551 URL: https://issues.apache.org/jira/browse/FLINK-22551 Project: Flink Issue Type: Bug Affects Versions: 1.13.0 Environment: {code:java} java -version openjdk version "11.0.2" 2019-01-15 OpenJDK Runtime Environment 18.9 (build 11.0.2+9) OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode) {code} Reporter: buom * +*Case 1*:+ Work as expected {code:java} public class Example { public static class ExampleSource extends RichSourceFunction<String> implements CheckpointedFunction { private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { System.out.println("[source] invoke open()"); } @Override public void close() throws Exception { isRunning = false; System.out.println("[source] invoke close()"); } @Override public void run(SourceContext<String> ctx) throws Exception { System.out.println("[source] invoke run()"); while (isRunning) { ctx.collect("Flink"); Thread.sleep(500); } } @Override public void cancel() { isRunning = false; System.out.println("[source] invoke cancel()"); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("[source] invoke snapshotState()"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("[source] invoke initializeState()"); } } public static class ExampleSink extends PrintSinkFunction<String> implements CheckpointedFunction { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("[sink] invoke snapshotState()"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("[sink] invoke initializeState()"); } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); DataStream<String> stream = env.addSource(new ExampleSource()); stream.addSink(new ExampleSink()).setParallelism(1); env.execute(); } } {code} {code:java} $ java -jar ./example.jar [sink] invoke initializeState() [source] invoke initializeState() [source] invoke open() [source] invoke run() Flink [sink] invoke snapshotState() [source] invoke snapshotState() Flink Flink [sink] invoke snapshotState() [source] invoke snapshotState() Flink Flink [sink] invoke snapshotState() [source] invoke snapshotState() ^C {code} * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_) {code:java} public class Example { public static class ExampleSource extends RichSourceFunction<String> implements CheckpointedFunction { private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { System.out.println("[source] invoke open()"); } @Override public void close() throws Exception { isRunning = false; System.out.println("[source] invoke close()"); } @Override public void run(SourceContext<String> ctx) throws Exception { System.out.println("[source] invoke run()"); while (isRunning) { ctx.collect("Flink"); Thread.sleep(500); } } @Override public void cancel() { isRunning = false; System.out.println("[source] invoke cancel()"); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("[source] invoke snapshotState()"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("[source] invoke initializeState()"); } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); DataStream<String> stream = env.addSource(new ExampleSource()); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); String topic = "my-topic"; FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( topic, (element, timestamp) -> { byte[] value = element.getBytes(StandardCharsets.UTF_8); return new ProducerRecord<>(topic, null, timestamp, null, value, null); }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); stream.addSink(kafkaProducer).setParallelism(1); env.execute(); } } {code} {code:java} $ java -jar ./example.jar [source] invoke cancel() [source] invoke cancel() [source] invoke cancel() [source] invoke cancel() ^C% {code} +*Case 3*+: Run as unexpected (w/ _parallelism = defaul_t) {code:java} public class Example { public static class ExampleSource extends RichSourceFunction<String> implements CheckpointedFunction { private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { System.out.println("[source] invoke open()"); } @Override public void close() throws Exception { isRunning = false; System.out.println("[source] invoke close()"); } @Override public void run(SourceContext<String> ctx) throws Exception { System.out.println("[source] invoke run()"); while (isRunning) { ctx.collect("Flink"); Thread.sleep(500); } } @Override public void cancel() { isRunning = false; System.out.println("[source] invoke cancel()"); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("[source] invoke snapshotState()"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("[source] invoke initializeState()"); } } public static class ExampleSink extends PrintSinkFunction<String> implements CheckpointedFunction { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("[sink] invoke snapshotState()"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("[sink] invoke initializeState()"); } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000); DataStream<String> stream = env.addSource(new ExampleSource()); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); String topic = "my-topic"; FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( topic, (element, timestamp) -> { byte[] value = element.getBytes(StandardCharsets.UTF_8); return new ProducerRecord<>(topic, null, timestamp, null, value, null); }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); stream.addSink(kafkaProducer); env.execute(); } }{code} {code:java} $ java -jar ./example.jar [source] invoke initializeState() [source] invoke open() [source] invoke run() [source] invoke cancel() [source] invoke close() [source] invoke initializeState() [source] invoke open() [source] invoke run() [source] invoke cancel() [source] invoke close() [source] invoke initializeState() [source] invoke open() [source] invoke run() [source] invoke snapshotState() [source] invoke cancel() [source] invoke close() ^C% {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |