non-deserializable root cause in DeclineCheckpoint

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

non-deserializable root cause in DeclineCheckpoint

Jeffrey Martin
JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076

I'm on Flink v1.9 with the Kafka connector and a standalone JM.

If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException
which gets wrapped in a CheckpointException which is sent to the JM as a
DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the
JM throws a fairly cryptic ClassNotFoundException. The details of the
KafkaException wind up suppressed so it's impossible to figure out what
actually went wrong.

I can think of two fixes that would prevent this from occurring in the
Kafka or other connectors in the future:
1. DeclineCheckpoint should always send a SerializedThrowable to the JM
rather than allowing CheckpointExceptions with non-deserializable root
causes to slip through
2. CheckpointException should always capture its wrapped exception as a
SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
rather than 'super(cause)').

Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Terry Wang
Hi, Jeffrey~

I think two fixes you mentioned may not work in your case.
This problem https://issues.apache.org/jira/browse/FLINK-14076 <https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM jar package environment inconsistent or jar loaded behavior inconsistent in nature.
Maybe the behavior  of standalone cluster’s dynamic class loader changed in flink 1.9 since you mentioned that your program run normally in flink 1.8.
Just a thought from me.
Hope to be useful~

Best,
Terry Wang



> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
>
> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>
> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>
> If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException
> which gets wrapped in a CheckpointException which is sent to the JM as a
> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the
> JM throws a fairly cryptic ClassNotFoundException. The details of the
> KafkaException wind up suppressed so it's impossible to figure out what
> actually went wrong.
>
> I can think of two fixes that would prevent this from occurring in the
> Kafka or other connectors in the future:
> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> rather than allowing CheckpointExceptions with non-deserializable root
> causes to slip through
> 2. CheckpointException should always capture its wrapped exception as a
> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> rather than 'super(cause)').
>
> Thoughts?

Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Jeffrey Martin
Thanks for suggestion, Terry. I've investigated a bit further.

DeclineCheckpoint specifically checks for the possibility of an exception
that the JM won't be able to deserialize (i.e. anything other than a
Checkpoint exception). It just doesn't check for the possibility of a
CheckpointException that can't be deserialize because its root cause can't
be deserialize.

I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
1.9 broke the FlinkKafkaProducer API so I wound up having to set the
Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
checkpoints to fail sometimes. That caused the KafkaException to be
propagated to the JM as the root cause of a CheckpointException.

On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote:

> Hi, Jeffrey~
>
> I think two fixes you mentioned may not work in your case.
> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
> jar package environment inconsistent or jar loaded behavior inconsistent in
> nature.
> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> in flink 1.9 since you mentioned that your program run normally in flink
> 1.8.
> Just a thought from me.
> Hope to be useful~
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
> >
> > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >
> > I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >
> > If FlinkKafkaProducer fails while checkpointing, it throws a
> KafkaException
> > which gets wrapped in a CheckpointException which is sent to the JM as a
> > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> the
> > JM throws a fairly cryptic ClassNotFoundException. The details of the
> > KafkaException wind up suppressed so it's impossible to figure out what
> > actually went wrong.
> >
> > I can think of two fixes that would prevent this from occurring in the
> > Kafka or other connectors in the future:
> > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> > rather than allowing CheckpointExceptions with non-deserializable root
> > causes to slip through
> > 2. CheckpointException should always capture its wrapped exception as a
> > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> > rather than 'super(cause)').
> >
> > Thoughts?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Terry Wang
Hi, Jeffrey~

Thanks for your detailed explanation and I understood why job failed with flink 1.9.

But the two fixes you mentioned may still not work well. As KafkaException can be serialized
in TM for there is necessary jar in its classpath but not in JM, so maybe it’s impossible to check
the possibility of serialization in advance.
Do I understand right?



Best,
Terry Wang



> 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道:
>
> Thanks for suggestion, Terry. I've investigated a bit further.
>
> DeclineCheckpoint specifically checks for the possibility of an exception
> that the JM won't be able to deserialize (i.e. anything other than a
> Checkpoint exception). It just doesn't check for the possibility of a
> CheckpointException that can't be deserialize because its root cause can't
> be deserialize.
>
> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> checkpoints to fail sometimes. That caused the KafkaException to be
> propagated to the JM as the root cause of a CheckpointException.
>
> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote:
>
>> Hi, Jeffrey~
>>
>> I think two fixes you mentioned may not work in your case.
>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
>> jar package environment inconsistent or jar loaded behavior inconsistent in
>> nature.
>> Maybe the behavior  of standalone cluster’s dynamic class loader changed
>> in flink 1.9 since you mentioned that your program run normally in flink
>> 1.8.
>> Just a thought from me.
>> Hope to be useful~
>>
>> Best,
>> Terry Wang
>>
>>
>>
>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
>>>
>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>>
>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>>
>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>> KafkaException
>>> which gets wrapped in a CheckpointException which is sent to the JM as a
>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>> the
>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>> KafkaException wind up suppressed so it's impossible to figure out what
>>> actually went wrong.
>>>
>>> I can think of two fixes that would prevent this from occurring in the
>>> Kafka or other connectors in the future:
>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>> rather than allowing CheckpointExceptions with non-deserializable root
>>> causes to slip through
>>> 2. CheckpointException should always capture its wrapped exception as a
>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>> rather than 'super(cause)').
>>>
>>> Thoughts?
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Jeffrey Martin
Hi Terry,

KafkaException comes in through the job's dependencies (it's defined in the
kafka-clients jar packed up in the fat job jar) and is on either the TM nor
JM default classpath. The job running in the TM includes the job
dependencies and so can throw a KafkaException but the JM can't deserialize
it because it's not available on the default classpath.

I'm suggesting defensively wrapping all causes of a CheckpointException in
a SerializedThrowable (in addition to defensively wrapping everything
except a CheckpointException). I believe SerializedThrowable is there
specifically for this case, i.e. where a job in the TM sends the JM an
exception that's defined only in the job itself.

It might be clearer if I just put up a PR :) I'd be happy to and it'll be
very short.

Best,

Jeff

On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote:

> Hi, Jeffrey~
>
> Thanks for your detailed explanation and I understood why job failed with
> flink 1.9.
>
> But the two fixes you mentioned may still not work well. As KafkaException
> can be serialized
> in TM for there is necessary jar in its classpath but not in JM, so maybe
> it’s impossible to check
> the possibility of serialization in advance.
> Do I understand right?
>
>
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道:
> >
> > Thanks for suggestion, Terry. I've investigated a bit further.
> >
> > DeclineCheckpoint specifically checks for the possibility of an exception
> > that the JM won't be able to deserialize (i.e. anything other than a
> > Checkpoint exception). It just doesn't check for the possibility of a
> > CheckpointException that can't be deserialize because its root cause
> can't
> > be deserialize.
> >
> > I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> > 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> > Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> > checkpoints to fail sometimes. That caused the KafkaException to be
> > propagated to the JM as the root cause of a CheckpointException.
> >
> > On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> I think two fixes you mentioned may not work in your case.
> >> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> >> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
> JM
> >> jar package environment inconsistent or jar loaded behavior
> inconsistent in
> >> nature.
> >> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> >> in flink 1.9 since you mentioned that your program run normally in flink
> >> 1.8.
> >> Just a thought from me.
> >> Hope to be useful~
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
> >>>
> >>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >>>
> >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >>>
> >>> If FlinkKafkaProducer fails while checkpointing, it throws a
> >> KafkaException
> >>> which gets wrapped in a CheckpointException which is sent to the JM as
> a
> >>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> >> the
> >>> JM throws a fairly cryptic ClassNotFoundException. The details of the
> >>> KafkaException wind up suppressed so it's impossible to figure out what
> >>> actually went wrong.
> >>>
> >>> I can think of two fixes that would prevent this from occurring in the
> >>> Kafka or other connectors in the future:
> >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> >>> rather than allowing CheckpointExceptions with non-deserializable root
> >>> causes to slip through
> >>> 2. CheckpointException should always capture its wrapped exception as a
> >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> >>> rather than 'super(cause)').
> >>>
> >>> Thoughts?
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Terry Wang
Hi Jeffrey,

You are right and I understood what you have  said  after I just studied the class org.apache.flink.util.SerializedThrowable.
I prefer the fixes no.2 you mentioned:
        CheckpointException should always capture its wrapped exception as a SerializedThrowable
Looking forward to seeing your pr soon :)

Best,
Terry Wang



> 在 2019年9月23日,上午11:48,Jeffrey Martin <[hidden email]> 写道:
>
> Hi Terry,
>
> KafkaException comes in through the job's dependencies (it's defined in the
> kafka-clients jar packed up in the fat job jar) and is on either the TM nor
> JM default classpath. The job running in the TM includes the job
> dependencies and so can throw a KafkaException but the JM can't deserialize
> it because it's not available on the default classpath.
>
> I'm suggesting defensively wrapping all causes of a CheckpointException in
> a SerializedThrowable (in addition to defensively wrapping everything
> except a CheckpointException). I believe SerializedThrowable is there
> specifically for this case, i.e. where a job in the TM sends the JM an
> exception that's defined only in the job itself.
>
> It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> very short.
>
> Best,
>
> Jeff
>
> On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote:
>
>> Hi, Jeffrey~
>>
>> Thanks for your detailed explanation and I understood why job failed with
>> flink 1.9.
>>
>> But the two fixes you mentioned may still not work well. As KafkaException
>> can be serialized
>> in TM for there is necessary jar in its classpath but not in JM, so maybe
>> it’s impossible to check
>> the possibility of serialization in advance.
>> Do I understand right?
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>>> 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道:
>>>
>>> Thanks for suggestion, Terry. I've investigated a bit further.
>>>
>>> DeclineCheckpoint specifically checks for the possibility of an exception
>>> that the JM won't be able to deserialize (i.e. anything other than a
>>> Checkpoint exception). It just doesn't check for the possibility of a
>>> CheckpointException that can't be deserialize because its root cause
>> can't
>>> be deserialize.
>>>
>>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
>>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
>>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
>>> checkpoints to fail sometimes. That caused the KafkaException to be
>>> propagated to the JM as the root cause of a CheckpointException.
>>>
>>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote:
>>>
>>>> Hi, Jeffrey~
>>>>
>>>> I think two fixes you mentioned may not work in your case.
>>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
>> JM
>>>> jar package environment inconsistent or jar loaded behavior
>> inconsistent in
>>>> nature.
>>>> Maybe the behavior  of standalone cluster’s dynamic class loader changed
>>>> in flink 1.9 since you mentioned that your program run normally in flink
>>>> 1.8.
>>>> Just a thought from me.
>>>> Hope to be useful~
>>>>
>>>> Best,
>>>> Terry Wang
>>>>
>>>>
>>>>
>>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
>>>>>
>>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>>>>
>>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>>>>
>>>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>>>> KafkaException
>>>>> which gets wrapped in a CheckpointException which is sent to the JM as
>> a
>>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>>>> the
>>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>>>> KafkaException wind up suppressed so it's impossible to figure out what
>>>>> actually went wrong.
>>>>>
>>>>> I can think of two fixes that would prevent this from occurring in the
>>>>> Kafka or other connectors in the future:
>>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>>>> rather than allowing CheckpointExceptions with non-deserializable root
>>>>> causes to slip through
>>>>> 2. CheckpointException should always capture its wrapped exception as a
>>>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>>>> rather than 'super(cause)').
>>>>>
>>>>> Thoughts?
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: non-deserializable root cause in DeclineCheckpoint

Jeffrey Martin
Draft PR here: https://github.com/apache/flink/pull/9742
There might be some failing tests (still waiting on Travis), but I think
the diff is small enough for you to evaluate the approach for acceptability.

On Sun, Sep 22, 2019 at 9:10 PM Terry Wang <[hidden email]> wrote:

> Hi Jeffrey,
>
> You are right and I understood what you have  said  after I just studied
> the class org.apache.flink.util.SerializedThrowable.
> I prefer the fixes no.2 you mentioned:
>         CheckpointException should always capture its wrapped exception as
> a SerializedThrowable
> Looking forward to seeing your pr soon :)
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午11:48,Jeffrey Martin <[hidden email]> 写道:
> >
> > Hi Terry,
> >
> > KafkaException comes in through the job's dependencies (it's defined in
> the
> > kafka-clients jar packed up in the fat job jar) and is on either the TM
> nor
> > JM default classpath. The job running in the TM includes the job
> > dependencies and so can throw a KafkaException but the JM can't
> deserialize
> > it because it's not available on the default classpath.
> >
> > I'm suggesting defensively wrapping all causes of a CheckpointException
> in
> > a SerializedThrowable (in addition to defensively wrapping everything
> > except a CheckpointException). I believe SerializedThrowable is there
> > specifically for this case, i.e. where a job in the TM sends the JM an
> > exception that's defined only in the job itself.
> >
> > It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> > very short.
> >
> > Best,
> >
> > Jeff
> >
> > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[hidden email]> wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> Thanks for your detailed explanation and I understood why job failed
> with
> >> flink 1.9.
> >>
> >> But the two fixes you mentioned may still not work well. As
> KafkaException
> >> can be serialized
> >> in TM for there is necessary jar in its classpath but not in JM, so
> maybe
> >> it’s impossible to check
> >> the possibility of serialization in advance.
> >> Do I understand right?
> >>
> >>
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <[hidden email]> 写道:
> >>>
> >>> Thanks for suggestion, Terry. I've investigated a bit further.
> >>>
> >>> DeclineCheckpoint specifically checks for the possibility of an
> exception
> >>> that the JM won't be able to deserialize (i.e. anything other than a
> >>> Checkpoint exception). It just doesn't check for the possibility of a
> >>> CheckpointException that can't be deserialize because its root cause
> >> can't
> >>> be deserialize.
> >>>
> >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring
> --
> >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> >>> checkpoints to fail sometimes. That caused the KafkaException to be
> >>> propagated to the JM as the root cause of a CheckpointException.
> >>>
> >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[hidden email]> wrote:
> >>>
> >>>> Hi, Jeffrey~
> >>>>
> >>>> I think two fixes you mentioned may not work in your case.
> >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM
> and
> >> JM
> >>>> jar package environment inconsistent or jar loaded behavior
> >> inconsistent in
> >>>> nature.
> >>>> Maybe the behavior  of standalone cluster’s dynamic class loader
> changed
> >>>> in flink 1.9 since you mentioned that your program run normally in
> flink
> >>>> 1.8.
> >>>> Just a thought from me.
> >>>> Hope to be useful~
> >>>>
> >>>> Best,
> >>>> Terry Wang
> >>>>
> >>>>
> >>>>
> >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[hidden email]> 写道:
> >>>>>
> >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >>>>>
> >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >>>>>
> >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a
> >>>> KafkaException
> >>>>> which gets wrapped in a CheckpointException which is sent to the JM
> as
> >> a
> >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath,
> so
> >>>> the
> >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
> >>>>> KafkaException wind up suppressed so it's impossible to figure out
> what
> >>>>> actually went wrong.
> >>>>>
> >>>>> I can think of two fixes that would prevent this from occurring in
> the
> >>>>> Kafka or other connectors in the future:
> >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the
> JM
> >>>>> rather than allowing CheckpointExceptions with non-deserializable
> root
> >>>>> causes to slip through
> >>>>> 2. CheckpointException should always capture its wrapped exception
> as a
> >>>>> SerializedThrowable (i.e., use 'super(new
> SerializedThrowable(cause))'
> >>>>> rather than 'super(cause)').
> >>>>>
> >>>>> Thoughts?
> >>>>
> >>>>
> >>
> >>
>
>