[jira] [Created] (FLINK-22551) checkpoints: strange behaviour

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22551) checkpoints: strange behaviour

Shang Yuanchun (Jira)
buom created FLINK-22551:
----------------------------

             Summary: checkpoints: strange behaviour
                 Key: FLINK-22551
                 URL: https://issues.apache.org/jira/browse/FLINK-22551
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.13.0
         Environment: {code:java}
 java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
{code}
            Reporter: buom


* +*Case 1*:+ Work as expected

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());
        stream.addSink(new ExampleSink()).setParallelism(1);

        env.execute();
    }
}
{code}
{code:java}
$ java -jar ./example.jar

[sink] invoke initializeState()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
^C
{code}
 * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("[source] invoke initializeState()");
        }
    }


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer).setParallelism(1);

        env.execute();
    }

}
{code}
{code:java}
 $ java -jar ./example.jar

[source] invoke cancel()
[source] invoke cancel()
[source] invoke cancel()
[source] invoke cancel()
^C%
{code}
+*Case 3*+: Run as unexpected (w/ _parallelism = defaul_t)
{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer);
        env.execute();
    }

}{code}
{code:java}
$ java -jar ./example.jar

[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke snapshotState()
[source] invoke cancel()
[source] invoke close()
^C%
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)