(possible dupe; I wasn't subscribed before and the previous message didn't
seem to go through) I'm on Flink v1.9 with the Kafka connector and a standalone JM. If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException which gets wrapped in a CheckpointException which is sent to the JM as a DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the JM throws a fairly cryptic ClassNotFoundException. The details of the KafkaException wind up suppressed so it's impossible to figure out what actually went wrong. I can think of two fixes that would prevent this from occurring in the Kafka or other connectors in the future: 1. DeclineCheckpoint should always send a SerializedThrowable to the JM rather than allowing CheckpointExceptions with non-deserializable root causes to slip through 2. CheckpointException should always capture its wrapped exception as a SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' rather than 'super(cause)'). Thoughts? |
To be clear -- I'm happy to make a PR for either option below. (Either is
<10 lines diff.) It's just the contributor guidelines said to get consensus first and then only make a PR if I'm assigned to do the work. On Fri, Sep 20, 2019 at 12:23 PM Jeffrey Martin <[hidden email]> wrote: > (possible dupe; I wasn't subscribed before and the previous message didn't > seem to go through) > > I'm on Flink v1.9 with the Kafka connector and a standalone JM. > If FlinkKafkaProducer fails while checkpointing, it throws a > KafkaException which gets wrapped in a CheckpointException which is sent to > the JM as a DeclineCheckpoint. KafkaException isn't on the JM default > classpath, so the JM throws a fairly cryptic ClassNotFoundException. The > details of the KafkaException wind up suppressed so it's impossible to > figure out what actually went wrong. > > I can think of two fixes that would prevent this from occurring in the > Kafka or other connectors in the future: > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > rather than allowing CheckpointExceptions with non-deserializable root > causes to slip through > 2. CheckpointException should always capture its wrapped exception as a > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > rather than 'super(cause)'). > > Thoughts? > |
Hi Jeffrey,
thanks for reporting this issue and starting a discussion how to solve this problem. I've pulled in Piotr who is working on the checkpointing part of Flink. If a user generated exception can get reported, then we need to make sure that it is properly handled. Approach 2. would be easier if we are ok with not having direct access to the root cause (w/o cumbersomely deserializing user defined exceptions). Approach 1. would make the fact that the decline checkpoint message might contain a user defined exception more explicit. However, if we don't use the concrete exception except for reporting, then approach 1. should be good enough. Cheers, Till On Sat, Sep 21, 2019 at 5:49 AM Jeffrey Martin <[hidden email]> wrote: > To be clear -- I'm happy to make a PR for either option below. (Either is > <10 lines diff.) It's just the contributor guidelines said to get consensus > first and then only make a PR if I'm assigned to do the work. > > On Fri, Sep 20, 2019 at 12:23 PM Jeffrey Martin < > [hidden email]> > wrote: > > > (possible dupe; I wasn't subscribed before and the previous message > didn't > > seem to go through) > > > > I'm on Flink v1.9 with the Kafka connector and a standalone JM. > > If FlinkKafkaProducer fails while checkpointing, it throws a > > KafkaException which gets wrapped in a CheckpointException which is sent > to > > the JM as a DeclineCheckpoint. KafkaException isn't on the JM default > > classpath, so the JM throws a fairly cryptic ClassNotFoundException. The > > details of the KafkaException wind up suppressed so it's impossible to > > figure out what actually went wrong. > > > > I can think of two fixes that would prevent this from occurring in the > > Kafka or other connectors in the future: > > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > > rather than allowing CheckpointExceptions with non-deserializable root > > causes to slip through > > 2. CheckpointException should always capture its wrapped exception as a > > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > > rather than 'super(cause)'). > > > > Thoughts? > > > |
Hi,
I guess the TaskManager should have logged the original exception somewhere (I’m not saying that we shouldn’t solve this, just to make sure that the basics are covered), so you should already be able to deduce the reason of failure, right? I think that option 2. would not only be easier, but cleaner. `CheckpointException` is a Flink class, there might be reasons for CheckpointCoordinator to access it, while there should be no reasons for the JM code to ever touch anything below (like a cause of the `CheckpointException` originating from user code). Piotrek > On 23 Sep 2019, at 14:56, Till Rohrmann <[hidden email]> wrote: > > Hi Jeffrey, > > thanks for reporting this issue and starting a discussion how to solve this > problem. I've pulled in Piotr who is working on the checkpointing part of > Flink. > > If a user generated exception can get reported, then we need to make sure > that it is properly handled. Approach 2. would be easier if we are ok with > not having direct access to the root cause (w/o cumbersomely deserializing > user defined exceptions). Approach 1. would make the fact that the decline > checkpoint message might contain a user defined exception more explicit. > However, if we don't use the concrete exception except for reporting, then > approach 1. should be good enough. > > Cheers, > Till > > On Sat, Sep 21, 2019 at 5:49 AM Jeffrey Martin <[hidden email]> > wrote: > >> To be clear -- I'm happy to make a PR for either option below. (Either is >> <10 lines diff.) It's just the contributor guidelines said to get consensus >> first and then only make a PR if I'm assigned to do the work. >> >> On Fri, Sep 20, 2019 at 12:23 PM Jeffrey Martin < >> [hidden email]> >> wrote: >> >>> (possible dupe; I wasn't subscribed before and the previous message >> didn't >>> seem to go through) >>> >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. >>> If FlinkKafkaProducer fails while checkpointing, it throws a >>> KafkaException which gets wrapped in a CheckpointException which is sent >> to >>> the JM as a DeclineCheckpoint. KafkaException isn't on the JM default >>> classpath, so the JM throws a fairly cryptic ClassNotFoundException. The >>> details of the KafkaException wind up suppressed so it's impossible to >>> figure out what actually went wrong. >>> >>> I can think of two fixes that would prevent this from occurring in the >>> Kafka or other connectors in the future: >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM >>> rather than allowing CheckpointExceptions with non-deserializable root >>> causes to slip through >>> 2. CheckpointException should always capture its wrapped exception as a >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' >>> rather than 'super(cause)'). >>> >>> Thoughts? >>> >> |
Free forum by Nabble | Edit this page |