JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
I'm on Flink v1.9 with the Kafka connector and a standalone JM. If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException which gets wrapped in a CheckpointException which is sent to the JM as a DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the JM throws a fairly cryptic ClassNotFoundException. The details of the KafkaException wind up suppressed so it's impossible to figure out what actually went wrong. I can think of two fixes that would prevent this from occurring in the Kafka or other connectors in the future: 1. DeclineCheckpoint should always send a SerializedThrowable to the JM rather than allowing CheckpointExceptions with non-deserializable root causes to slip through 2. CheckpointException should always capture its wrapped exception as a SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' rather than 'super(cause)'). Thoughts? |
Hi, Jeffrey~
I think two fixes you mentioned may not work in your case. This problem https://issues.apache.org/jira/browse/FLINK-14076 <https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM jar package environment inconsistent or jar loaded behavior inconsistent in nature. Maybe the behavior of standalone cluster’s dynamic class loader changed in flink 1.9 since you mentioned that your program run normally in flink 1.8. Just a thought from me. Hope to be useful~ Best, Terry Wang > 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: > > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > > I'm on Flink v1.9 with the Kafka connector and a standalone JM. > > If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException > which gets wrapped in a CheckpointException which is sent to the JM as a > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the > JM throws a fairly cryptic ClassNotFoundException. The details of the > KafkaException wind up suppressed so it's impossible to figure out what > actually went wrong. > > I can think of two fixes that would prevent this from occurring in the > Kafka or other connectors in the future: > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > rather than allowing CheckpointExceptions with non-deserializable root > causes to slip through > 2. CheckpointException should always capture its wrapped exception as a > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > rather than 'super(cause)'). > > Thoughts? |
Thanks for suggestion, Terry. I've investigated a bit further.
DeclineCheckpoint specifically checks for the possibility of an exception that the JM won't be able to deserialize (i.e. anything other than a Checkpoint exception). It just doesn't check for the possibility of a CheckpointException that can't be deserialize because its root cause can't be deserialize. I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- 1.9 broke the FlinkKafkaProducer API so I wound up having to set the Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused checkpoints to fail sometimes. That caused the KafkaException to be propagated to the JM as the root cause of a CheckpointException. On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote: > Hi, Jeffrey~ > > I think two fixes you mentioned may not work in your case. > This problem https://issues.apache.org/jira/browse/FLINK-14076 < > https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM > jar package environment inconsistent or jar loaded behavior inconsistent in > nature. > Maybe the behavior of standalone cluster’s dynamic class loader changed > in flink 1.9 since you mentioned that your program run normally in flink > 1.8. > Just a thought from me. > Hope to be useful~ > > Best, > Terry Wang > > > > > 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: > > > > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > > > > I'm on Flink v1.9 with the Kafka connector and a standalone JM. > > > > If FlinkKafkaProducer fails while checkpointing, it throws a > KafkaException > > which gets wrapped in a CheckpointException which is sent to the JM as a > > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so > the > > JM throws a fairly cryptic ClassNotFoundException. The details of the > > KafkaException wind up suppressed so it's impossible to figure out what > > actually went wrong. > > > > I can think of two fixes that would prevent this from occurring in the > > Kafka or other connectors in the future: > > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > > rather than allowing CheckpointExceptions with non-deserializable root > > causes to slip through > > 2. CheckpointException should always capture its wrapped exception as a > > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > > rather than 'super(cause)'). > > > > Thoughts? > > |
Hi, Jeffrey~
Thanks for your detailed explanation and I understood why job failed with flink 1.9. But the two fixes you mentioned may still not work well. As KafkaException can be serialized in TM for there is necessary jar in its classpath but not in JM, so maybe it’s impossible to check the possibility of serialization in advance. Do I understand right? Best, Terry Wang > 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道: > > Thanks for suggestion, Terry. I've investigated a bit further. > > DeclineCheckpoint specifically checks for the possibility of an exception > that the JM won't be able to deserialize (i.e. anything other than a > Checkpoint exception). It just doesn't check for the possibility of a > CheckpointException that can't be deserialize because its root cause can't > be deserialize. > > I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- > 1.9 broke the FlinkKafkaProducer API so I wound up having to set the > Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused > checkpoints to fail sometimes. That caused the KafkaException to be > propagated to the JM as the root cause of a CheckpointException. > > On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote: > >> Hi, Jeffrey~ >> >> I think two fixes you mentioned may not work in your case. >> This problem https://issues.apache.org/jira/browse/FLINK-14076 < >> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM >> jar package environment inconsistent or jar loaded behavior inconsistent in >> nature. >> Maybe the behavior of standalone cluster’s dynamic class loader changed >> in flink 1.9 since you mentioned that your program run normally in flink >> 1.8. >> Just a thought from me. >> Hope to be useful~ >> >> Best, >> Terry Wang >> >> >> >>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: >>> >>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 >>> >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. >>> >>> If FlinkKafkaProducer fails while checkpointing, it throws a >> KafkaException >>> which gets wrapped in a CheckpointException which is sent to the JM as a >>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so >> the >>> JM throws a fairly cryptic ClassNotFoundException. The details of the >>> KafkaException wind up suppressed so it's impossible to figure out what >>> actually went wrong. >>> >>> I can think of two fixes that would prevent this from occurring in the >>> Kafka or other connectors in the future: >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM >>> rather than allowing CheckpointExceptions with non-deserializable root >>> causes to slip through >>> 2. CheckpointException should always capture its wrapped exception as a >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' >>> rather than 'super(cause)'). >>> >>> Thoughts? >> >> |
Hi Terry,
KafkaException comes in through the job's dependencies (it's defined in the kafka-clients jar packed up in the fat job jar) and is on either the TM nor JM default classpath. The job running in the TM includes the job dependencies and so can throw a KafkaException but the JM can't deserialize it because it's not available on the default classpath. I'm suggesting defensively wrapping all causes of a CheckpointException in a SerializedThrowable (in addition to defensively wrapping everything except a CheckpointException). I believe SerializedThrowable is there specifically for this case, i.e. where a job in the TM sends the JM an exception that's defined only in the job itself. It might be clearer if I just put up a PR :) I'd be happy to and it'll be very short. Best, Jeff On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote: > Hi, Jeffrey~ > > Thanks for your detailed explanation and I understood why job failed with > flink 1.9. > > But the two fixes you mentioned may still not work well. As KafkaException > can be serialized > in TM for there is necessary jar in its classpath but not in JM, so maybe > it’s impossible to check > the possibility of serialization in advance. > Do I understand right? > > > > Best, > Terry Wang > > > > > 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道: > > > > Thanks for suggestion, Terry. I've investigated a bit further. > > > > DeclineCheckpoint specifically checks for the possibility of an exception > > that the JM won't be able to deserialize (i.e. anything other than a > > Checkpoint exception). It just doesn't check for the possibility of a > > CheckpointException that can't be deserialize because its root cause > can't > > be deserialize. > > > > I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- > > 1.9 broke the FlinkKafkaProducer API so I wound up having to set the > > Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused > > checkpoints to fail sometimes. That caused the KafkaException to be > > propagated to the JM as the root cause of a CheckpointException. > > > > On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote: > > > >> Hi, Jeffrey~ > >> > >> I think two fixes you mentioned may not work in your case. > >> This problem https://issues.apache.org/jira/browse/FLINK-14076 < > >> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and > JM > >> jar package environment inconsistent or jar loaded behavior > inconsistent in > >> nature. > >> Maybe the behavior of standalone cluster’s dynamic class loader changed > >> in flink 1.9 since you mentioned that your program run normally in flink > >> 1.8. > >> Just a thought from me. > >> Hope to be useful~ > >> > >> Best, > >> Terry Wang > >> > >> > >> > >>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: > >>> > >>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > >>> > >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. > >>> > >>> If FlinkKafkaProducer fails while checkpointing, it throws a > >> KafkaException > >>> which gets wrapped in a CheckpointException which is sent to the JM as > a > >>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so > >> the > >>> JM throws a fairly cryptic ClassNotFoundException. The details of the > >>> KafkaException wind up suppressed so it's impossible to figure out what > >>> actually went wrong. > >>> > >>> I can think of two fixes that would prevent this from occurring in the > >>> Kafka or other connectors in the future: > >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > >>> rather than allowing CheckpointExceptions with non-deserializable root > >>> causes to slip through > >>> 2. CheckpointException should always capture its wrapped exception as a > >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > >>> rather than 'super(cause)'). > >>> > >>> Thoughts? > >> > >> > > |
Hi Jeffrey,
You are right and I understood what you have said after I just studied the class org.apache.flink.util.SerializedThrowable. I prefer the fixes no.2 you mentioned: CheckpointException should always capture its wrapped exception as a SerializedThrowable Looking forward to seeing your pr soon :) Best, Terry Wang > 在 2019年9月23日,上午11:48,Jeffrey Martin <[hidden email]> 写道: > > Hi Terry, > > KafkaException comes in through the job's dependencies (it's defined in the > kafka-clients jar packed up in the fat job jar) and is on either the TM nor > JM default classpath. The job running in the TM includes the job > dependencies and so can throw a KafkaException but the JM can't deserialize > it because it's not available on the default classpath. > > I'm suggesting defensively wrapping all causes of a CheckpointException in > a SerializedThrowable (in addition to defensively wrapping everything > except a CheckpointException). I believe SerializedThrowable is there > specifically for this case, i.e. where a job in the TM sends the JM an > exception that's defined only in the job itself. > > It might be clearer if I just put up a PR :) I'd be happy to and it'll be > very short. > > Best, > > Jeff > > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote: > >> Hi, Jeffrey~ >> >> Thanks for your detailed explanation and I understood why job failed with >> flink 1.9. >> >> But the two fixes you mentioned may still not work well. As KafkaException >> can be serialized >> in TM for there is necessary jar in its classpath but not in JM, so maybe >> it’s impossible to check >> the possibility of serialization in advance. >> Do I understand right? >> >> >> >> Best, >> Terry Wang >> >> >> >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道: >>> >>> Thanks for suggestion, Terry. I've investigated a bit further. >>> >>> DeclineCheckpoint specifically checks for the possibility of an exception >>> that the JM won't be able to deserialize (i.e. anything other than a >>> Checkpoint exception). It just doesn't check for the possibility of a >>> CheckpointException that can't be deserialize because its root cause >> can't >>> be deserialize. >>> >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused >>> checkpoints to fail sometimes. That caused the KafkaException to be >>> propagated to the JM as the root cause of a CheckpointException. >>> >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote: >>> >>>> Hi, Jeffrey~ >>>> >>>> I think two fixes you mentioned may not work in your case. >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 < >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and >> JM >>>> jar package environment inconsistent or jar loaded behavior >> inconsistent in >>>> nature. >>>> Maybe the behavior of standalone cluster’s dynamic class loader changed >>>> in flink 1.9 since you mentioned that your program run normally in flink >>>> 1.8. >>>> Just a thought from me. >>>> Hope to be useful~ >>>> >>>> Best, >>>> Terry Wang >>>> >>>> >>>> >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: >>>>> >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 >>>>> >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. >>>>> >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a >>>> KafkaException >>>>> which gets wrapped in a CheckpointException which is sent to the JM as >> a >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so >>>> the >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the >>>>> KafkaException wind up suppressed so it's impossible to figure out what >>>>> actually went wrong. >>>>> >>>>> I can think of two fixes that would prevent this from occurring in the >>>>> Kafka or other connectors in the future: >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM >>>>> rather than allowing CheckpointExceptions with non-deserializable root >>>>> causes to slip through >>>>> 2. CheckpointException should always capture its wrapped exception as a >>>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' >>>>> rather than 'super(cause)'). >>>>> >>>>> Thoughts? >>>> >>>> >> >> |
Draft PR here: https://github.com/apache/flink/pull/9742
There might be some failing tests (still waiting on Travis), but I think the diff is small enough for you to evaluate the approach for acceptability. On Sun, Sep 22, 2019 at 9:10 PM Terry Wang <[hidden email]> wrote: > Hi Jeffrey, > > You are right and I understood what you have said after I just studied > the class org.apache.flink.util.SerializedThrowable. > I prefer the fixes no.2 you mentioned: > CheckpointException should always capture its wrapped exception as > a SerializedThrowable > Looking forward to seeing your pr soon :) > > Best, > Terry Wang > > > > > 在 2019年9月23日,上午11:48,Jeffrey Martin <[hidden email]> 写道: > > > > Hi Terry, > > > > KafkaException comes in through the job's dependencies (it's defined in > the > > kafka-clients jar packed up in the fat job jar) and is on either the TM > nor > > JM default classpath. The job running in the TM includes the job > > dependencies and so can throw a KafkaException but the JM can't > deserialize > > it because it's not available on the default classpath. > > > > I'm suggesting defensively wrapping all causes of a CheckpointException > in > > a SerializedThrowable (in addition to defensively wrapping everything > > except a CheckpointException). I believe SerializedThrowable is there > > specifically for this case, i.e. where a job in the TM sends the JM an > > exception that's defined only in the job itself. > > > > It might be clearer if I just put up a PR :) I'd be happy to and it'll be > > very short. > > > > Best, > > > > Jeff > > > > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote: > > > >> Hi, Jeffrey~ > >> > >> Thanks for your detailed explanation and I understood why job failed > with > >> flink 1.9. > >> > >> But the two fixes you mentioned may still not work well. As > KafkaException > >> can be serialized > >> in TM for there is necessary jar in its classpath but not in JM, so > maybe > >> it’s impossible to check > >> the possibility of serialization in advance. > >> Do I understand right? > >> > >> > >> > >> Best, > >> Terry Wang > >> > >> > >> > >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道: > >>> > >>> Thanks for suggestion, Terry. I've investigated a bit further. > >>> > >>> DeclineCheckpoint specifically checks for the possibility of an > exception > >>> that the JM won't be able to deserialize (i.e. anything other than a > >>> Checkpoint exception). It just doesn't check for the possibility of a > >>> CheckpointException that can't be deserialize because its root cause > >> can't > >>> be deserialize. > >>> > >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring > -- > >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the > >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused > >>> checkpoints to fail sometimes. That caused the KafkaException to be > >>> propagated to the JM as the root cause of a CheckpointException. > >>> > >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote: > >>> > >>>> Hi, Jeffrey~ > >>>> > >>>> I think two fixes you mentioned may not work in your case. > >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 < > >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM > and > >> JM > >>>> jar package environment inconsistent or jar loaded behavior > >> inconsistent in > >>>> nature. > >>>> Maybe the behavior of standalone cluster’s dynamic class loader > changed > >>>> in flink 1.9 since you mentioned that your program run normally in > flink > >>>> 1.8. > >>>> Just a thought from me. > >>>> Hope to be useful~ > >>>> > >>>> Best, > >>>> Terry Wang > >>>> > >>>> > >>>> > >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道: > >>>>> > >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > >>>>> > >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. > >>>>> > >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a > >>>> KafkaException > >>>>> which gets wrapped in a CheckpointException which is sent to the JM > as > >> a > >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, > so > >>>> the > >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the > >>>>> KafkaException wind up suppressed so it's impossible to figure out > what > >>>>> actually went wrong. > >>>>> > >>>>> I can think of two fixes that would prevent this from occurring in > the > >>>>> Kafka or other connectors in the future: > >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the > JM > >>>>> rather than allowing CheckpointExceptions with non-deserializable > root > >>>>> causes to slip through > >>>>> 2. CheckpointException should always capture its wrapped exception > as a > >>>>> SerializedThrowable (i.e., use 'super(new > SerializedThrowable(cause))' > >>>>> rather than 'super(cause)'). > >>>>> > >>>>> Thoughts? > >>>> > >>>> > >> > >> > > |
Free forum by Nabble | Edit this page |