Flink job cannot find recover path after using entropy injection for s3 file systems

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job cannot find recover path after using entropy injection for s3 file systems

Rainie Li
Hi Flink Developers.

We enabled entropy injection for s3, here is our setting on Yarn Cluster.
s3.entropy.key: _entropy_
s3.entropy.length: 1
state.checkpoints.dir: 's3a://{bucket name}/dev/checkpoints/_entropy_'

I have two questions:
1. After enabling entropy, job's checkpoint path changed to:
*s3://{bucket name}/dev/checkpoints/_entropy_/{job_id}chk-607*
SInce we don't know which key is mapped to _entropy_
It cannot be used to relaunch flink jobs by running
*flink run -s **s3://{bucket
name}/dev/checkpoints/_entropy_/{job_id}chk-607*
If you also enabled entropy injection for s3, any suggestion how to recover
failed jobs using entropy checkpoints?

2.We added entropy settings on the Yarn cluster.
But we can only see flink jobs in version 1.11 shows the entropy checkpoint
path.
For flink jobs version 1.9, they are still using checkpoint paths without
entropy like:
*s3://{bucket name}/dev/checkpoints/{job_id}/chk-607*
Is this path equal to s3://*{bucket name}*
*/dev/checkpoints/_entropy_/{job_id}**chk-607?*
Does entropy work for v1.9? If so, why does v1.9 job show checkpoint paths
*without* entropy?

Appreciated any suggestions.
Thanks
Best regards
Rainie
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Till Rohrmann
Hi Rainie,

1. I think what you need to do is to look for the {job_id} in all the
possible sub folders of the dev/checkpoints/ folder or you extract the
entropy from the logs.

2. According to [1] entropy should only be used for the data files and not
for the metadata files. The idea was to keep the metadata path entropy free
in order to make it more easily discoverable. I can imagine that this
changed with FLINK-5763 [2] which was added in Flink 1.11. This effectively
means that in order to make checkpoints/savepoints self contained we needed
to add the entropy also to the metadata file paths. Moreover, this also
means that the entropy injection works for 1.9 and 1.11. I think it was
introduced with Flink 1.6.2, 1.7.0 [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#entropy-injection-for-s3-file-systems
[2] https://issues.apache.org/jira/browse/FLINK-5763
[3] https://issues.apache.org/jira/browse/FLINK-9061

Cheers,
Till

On Tue, Mar 16, 2021 at 7:03 PM Rainie Li <[hidden email]>
wrote:

> Hi Flink Developers.
>
> We enabled entropy injection for s3, here is our setting on Yarn Cluster.
> s3.entropy.key: _entropy_
> s3.entropy.length: 1
> state.checkpoints.dir: 's3a://{bucket name}/dev/checkpoints/_entropy_'
>
> I have two questions:
> 1. After enabling entropy, job's checkpoint path changed to:
> *s3://{bucket name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> SInce we don't know which key is mapped to _entropy_
> It cannot be used to relaunch flink jobs by running
> *flink run -s **s3://{bucket
> name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> If you also enabled entropy injection for s3, any suggestion how to recover
> failed jobs using entropy checkpoints?
>
> 2.We added entropy settings on the Yarn cluster.
> But we can only see flink jobs in version 1.11 shows the entropy checkpoint
> path.
> For flink jobs version 1.9, they are still using checkpoint paths without
> entropy like:
> *s3://{bucket name}/dev/checkpoints/{job_id}/chk-607*
> Is this path equal to s3://*{bucket name}*
> */dev/checkpoints/_entropy_/{job_id}**chk-607?*
> Does entropy work for v1.9? If so, why does v1.9 job show checkpoint paths
> *without* entropy?
>
> Appreciated any suggestions.
> Thanks
> Best regards
> Rainie
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Rainie Li
Thanks for checking, Till.

I have a follow up question for #2, do you know why the same job cannot
show up at the entropy checkpoint in Version 1.9.
For example:
*When it's running in v1.11, checkpoint path is: *
s3a://{bucket name}/dev/checkpoints/_entropy_/{job_id}/chk-1537
*When it's running in v1.9, checkpoint path is: *
s3a://{bucket name}/dev/checkpoints/{job_id}/chk-2230

Not sure which caused this inconsistency issue.
Thanks
Best regards
Rainie

On Wed, Mar 17, 2021 at 6:38 AM Till Rohrmann <[hidden email]> wrote:

> Hi Rainie,
>
> 1. I think what you need to do is to look for the {job_id} in all the
> possible sub folders of the dev/checkpoints/ folder or you extract the
> entropy from the logs.
>
> 2. According to [1] entropy should only be used for the data files and not
> for the metadata files. The idea was to keep the metadata path entropy free
> in order to make it more easily discoverable. I can imagine that this
> changed with FLINK-5763 [2] which was added in Flink 1.11. This effectively
> means that in order to make checkpoints/savepoints self contained we needed
> to add the entropy also to the metadata file paths. Moreover, this also
> means that the entropy injection works for 1.9 and 1.11. I think it was
> introduced with Flink 1.6.2, 1.7.0 [3].
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#entropy-injection-for-s3-file-systems
> [2] https://issues.apache.org/jira/browse/FLINK-5763
> [3] https://issues.apache.org/jira/browse/FLINK-9061
>
> Cheers,
> Till
>
> On Tue, Mar 16, 2021 at 7:03 PM Rainie Li <[hidden email]>
> wrote:
>
> > Hi Flink Developers.
> >
> > We enabled entropy injection for s3, here is our setting on Yarn Cluster.
> > s3.entropy.key: _entropy_
> > s3.entropy.length: 1
> > state.checkpoints.dir: 's3a://{bucket name}/dev/checkpoints/_entropy_'
> >
> > I have two questions:
> > 1. After enabling entropy, job's checkpoint path changed to:
> > *s3://{bucket name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > SInce we don't know which key is mapped to _entropy_
> > It cannot be used to relaunch flink jobs by running
> > *flink run -s **s3://{bucket
> > name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > If you also enabled entropy injection for s3, any suggestion how to
> recover
> > failed jobs using entropy checkpoints?
> >
> > 2.We added entropy settings on the Yarn cluster.
> > But we can only see flink jobs in version 1.11 shows the entropy
> checkpoint
> > path.
> > For flink jobs version 1.9, they are still using checkpoint paths without
> > entropy like:
> > *s3://{bucket name}/dev/checkpoints/{job_id}/chk-607*
> > Is this path equal to s3://*{bucket name}*
> > */dev/checkpoints/_entropy_/{job_id}**chk-607?*
> > Does entropy work for v1.9? If so, why does v1.9 job show checkpoint
> paths
> > *without* entropy?
> >
> > Appreciated any suggestions.
> > Thanks
> > Best regards
> > Rainie
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Till Rohrmann
Hi Rainie,

if I remember correctly (unfortunately I don't have a S3 deployment at hand
to try it out), then in v1.9 you should find the data files for the
checkpoint under s3a://{bucket
name}/dev/checkpoints/_entropy_/{job_id}/chk-2230. A checkpoint consists of
these data files and a metadata file which links the individual data files
from the different operators together to a checkpoint. The metadata file
should be stored under s3a://{bucket
name}/dev/checkpoints/{job_id}/chk-2230 so that it is easily discoverable.
If the data files are also contained in s3a://{bucket
name}/dev/checkpoints/{job_id}/chk-2230, then there is some problem or the
system did not properly use the entropy functionality.

My suspicion is that with FLINK-5763 (this has been introduced with Flink
1.11) we moved the metadata file also under the entropy folder to make the
checkpoints/savepoints self-contained and relocatable.

Cheers,
Till

On Wed, Mar 17, 2021 at 10:14 PM Rainie Li <[hidden email]>
wrote:

> Thanks for checking, Till.
>
> I have a follow up question for #2, do you know why the same job cannot
> show up at the entropy checkpoint in Version 1.9.
> For example:
> *When it's running in v1.11, checkpoint path is: *
> s3a://{bucket name}/dev/checkpoints/_entropy_/{job_id}/chk-1537
> *When it's running in v1.9, checkpoint path is: *
> s3a://{bucket name}/dev/checkpoints/{job_id}/chk-2230
>
> Not sure which caused this inconsistency issue.
> Thanks
> Best regards
> Rainie
>
> On Wed, Mar 17, 2021 at 6:38 AM Till Rohrmann <[hidden email]>
> wrote:
>
> > Hi Rainie,
> >
> > 1. I think what you need to do is to look for the {job_id} in all the
> > possible sub folders of the dev/checkpoints/ folder or you extract the
> > entropy from the logs.
> >
> > 2. According to [1] entropy should only be used for the data files and
> not
> > for the metadata files. The idea was to keep the metadata path entropy
> free
> > in order to make it more easily discoverable. I can imagine that this
> > changed with FLINK-5763 [2] which was added in Flink 1.11. This
> effectively
> > means that in order to make checkpoints/savepoints self contained we
> needed
> > to add the entropy also to the metadata file paths. Moreover, this also
> > means that the entropy injection works for 1.9 and 1.11. I think it was
> > introduced with Flink 1.6.2, 1.7.0 [3].
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#entropy-injection-for-s3-file-systems
> > [2] https://issues.apache.org/jira/browse/FLINK-5763
> > [3] https://issues.apache.org/jira/browse/FLINK-9061
> >
> > Cheers,
> > Till
> >
> > On Tue, Mar 16, 2021 at 7:03 PM Rainie Li <[hidden email]
> .invalid>
> > wrote:
> >
> > > Hi Flink Developers.
> > >
> > > We enabled entropy injection for s3, here is our setting on Yarn
> Cluster.
> > > s3.entropy.key: _entropy_
> > > s3.entropy.length: 1
> > > state.checkpoints.dir: 's3a://{bucket name}/dev/checkpoints/_entropy_'
> > >
> > > I have two questions:
> > > 1. After enabling entropy, job's checkpoint path changed to:
> > > *s3://{bucket name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > > SInce we don't know which key is mapped to _entropy_
> > > It cannot be used to relaunch flink jobs by running
> > > *flink run -s **s3://{bucket
> > > name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > > If you also enabled entropy injection for s3, any suggestion how to
> > recover
> > > failed jobs using entropy checkpoints?
> > >
> > > 2.We added entropy settings on the Yarn cluster.
> > > But we can only see flink jobs in version 1.11 shows the entropy
> > checkpoint
> > > path.
> > > For flink jobs version 1.9, they are still using checkpoint paths
> without
> > > entropy like:
> > > *s3://{bucket name}/dev/checkpoints/{job_id}/chk-607*
> > > Is this path equal to s3://*{bucket name}*
> > > */dev/checkpoints/_entropy_/{job_id}**chk-607?*
> > > Does entropy work for v1.9? If so, why does v1.9 job show checkpoint
> > paths
> > > *without* entropy?
> > >
> > > Appreciated any suggestions.
> > > Thanks
> > > Best regards
> > > Rainie
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Rainie Li
I see, thanks for the info, Till.
Appreciated for your help.

Best regards
Rainie

On Thu, Mar 18, 2021 at 2:09 AM Till Rohrmann <[hidden email]> wrote:

> Hi Rainie,
>
> if I remember correctly (unfortunately I don't have a S3 deployment at hand
> to try it out), then in v1.9 you should find the data files for the
> checkpoint under s3a://{bucket
> name}/dev/checkpoints/_entropy_/{job_id}/chk-2230. A checkpoint consists of
> these data files and a metadata file which links the individual data files
> from the different operators together to a checkpoint. The metadata file
> should be stored under s3a://{bucket
> name}/dev/checkpoints/{job_id}/chk-2230 so that it is easily discoverable.
> If the data files are also contained in s3a://{bucket
> name}/dev/checkpoints/{job_id}/chk-2230, then there is some problem or the
> system did not properly use the entropy functionality.
>
> My suspicion is that with FLINK-5763 (this has been introduced with Flink
> 1.11) we moved the metadata file also under the entropy folder to make the
> checkpoints/savepoints self-contained and relocatable.
>
> Cheers,
> Till
>
> On Wed, Mar 17, 2021 at 10:14 PM Rainie Li <[hidden email]
> .invalid>
> wrote:
>
> > Thanks for checking, Till.
> >
> > I have a follow up question for #2, do you know why the same job cannot
> > show up at the entropy checkpoint in Version 1.9.
> > For example:
> > *When it's running in v1.11, checkpoint path is: *
> > s3a://{bucket name}/dev/checkpoints/_entropy_/{job_id}/chk-1537
> > *When it's running in v1.9, checkpoint path is: *
> > s3a://{bucket name}/dev/checkpoints/{job_id}/chk-2230
> >
> > Not sure which caused this inconsistency issue.
> > Thanks
> > Best regards
> > Rainie
> >
> > On Wed, Mar 17, 2021 at 6:38 AM Till Rohrmann <[hidden email]>
> > wrote:
> >
> > > Hi Rainie,
> > >
> > > 1. I think what you need to do is to look for the {job_id} in all the
> > > possible sub folders of the dev/checkpoints/ folder or you extract the
> > > entropy from the logs.
> > >
> > > 2. According to [1] entropy should only be used for the data files and
> > not
> > > for the metadata files. The idea was to keep the metadata path entropy
> > free
> > > in order to make it more easily discoverable. I can imagine that this
> > > changed with FLINK-5763 [2] which was added in Flink 1.11. This
> > effectively
> > > means that in order to make checkpoints/savepoints self contained we
> > needed
> > > to add the entropy also to the metadata file paths. Moreover, this also
> > > means that the entropy injection works for 1.9 and 1.11. I think it was
> > > introduced with Flink 1.6.2, 1.7.0 [3].
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#entropy-injection-for-s3-file-systems
> > > [2] https://issues.apache.org/jira/browse/FLINK-5763
> > > [3] https://issues.apache.org/jira/browse/FLINK-9061
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Mar 16, 2021 at 7:03 PM Rainie Li <[hidden email]
> > .invalid>
> > > wrote:
> > >
> > > > Hi Flink Developers.
> > > >
> > > > We enabled entropy injection for s3, here is our setting on Yarn
> > Cluster.
> > > > s3.entropy.key: _entropy_
> > > > s3.entropy.length: 1
> > > > state.checkpoints.dir: 's3a://{bucket
> name}/dev/checkpoints/_entropy_'
> > > >
> > > > I have two questions:
> > > > 1. After enabling entropy, job's checkpoint path changed to:
> > > > *s3://{bucket name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > > > SInce we don't know which key is mapped to _entropy_
> > > > It cannot be used to relaunch flink jobs by running
> > > > *flink run -s **s3://{bucket
> > > > name}/dev/checkpoints/_entropy_/{job_id}chk-607*
> > > > If you also enabled entropy injection for s3, any suggestion how to
> > > recover
> > > > failed jobs using entropy checkpoints?
> > > >
> > > > 2.We added entropy settings on the Yarn cluster.
> > > > But we can only see flink jobs in version 1.11 shows the entropy
> > > checkpoint
> > > > path.
> > > > For flink jobs version 1.9, they are still using checkpoint paths
> > without
> > > > entropy like:
> > > > *s3://{bucket name}/dev/checkpoints/{job_id}/chk-607*
> > > > Is this path equal to s3://*{bucket name}*
> > > > */dev/checkpoints/_entropy_/{job_id}**chk-607?*
> > > > Does entropy work for v1.9? If so, why does v1.9 job show checkpoint
> > > paths
> > > > *without* entropy?
> > > >
> > > > Appreciated any suggestions.
> > > > Thanks
> > > > Best regards
> > > > Rainie
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

chenqin
Hi Till,

Thanks for sharing pointers related to entropy injection feature on 1.11.
We did some investigation and so far it seems like an edge case handling
bug.

Testing Environment:
flink 1.11.2 release with plugins
plugins/s3-fs-hadoop/flink-s3-fs-hadoop

state.backend.rocksdb.timer-service.factory rocksdb
state.checkpoints.dir s3a://balabala/dev/checkpoints/_entropy_/test
state.checkpoints.num-retained 3
s3.entropy.key _entropy_
s3.entropy.length 1

Observation: (v.s 1.9 no entropy marker)
checkpoint _metadata path point to directory with entropy marker
s3a://balabala/dev/checkpoints/_entropy_/xenon/event-stream-splitter/3bec4b7d4ac5c116649a4f579b87628e/chk-1669

Investigation:
add LOG.warn EntropyInjector#removeEntropyMarkerIfPresent build new binary
and copy dist jar to dev cluster gateway clifront lib/ folder.
rerun same job, found warning logs final EntropyInjectingFileSystem efs =
getEntropyFs(fs); return null.

adding more logs inside of getEntropyFs
found fs passed from removeEntropyMarkerIfPresent is instanceof
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem

so I realized there might be a bug in handling filesystem check, apply a
small fix there
patch.diff
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t342/patch.diff>  

repeat step above, run same job, seems remove entropy marker works
s3a://balabala/dev/checkpoints/xenon/event-stream-splitter/26081a69c330522e0b3f4fceb852401e/chk-27

Can we loop in someone to take a look this patch?


Chen





--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

chenqin
make it easier to read

        @Nullable
        private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
                LOG.warn(fs.getClass().toGenericString());
                if (fs instanceof EntropyInjectingFileSystem) {
                        return (EntropyInjectingFileSystem) fs;
                } else if (fs instanceof SafetyNetWrapperFileSystem) {
                        FileSystem delegate = ((SafetyNetWrapperFileSystem)
fs).getWrappedDelegate();
                        return getEntropyFs(delegate);
                } else if (fs instanceof
PluginFileSystemFactory.ClassLoaderFixingFileSystem) {
                        FileSystem innerFs =
((PluginFileSystemFactory.ClassLoaderFixingFileSystem) fs).getInner();
                        return getEntropyFs(innerFs);
                }
                else {
                        return null;
                }
        }



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

chenqin
Also noticed the actual states stored in _metadata still contains entropy
marker after we fix metadata directory issue. This issue seems related to
code refactory as well as doesn't conveyed in tests.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Till Rohrmann
Thanks for looking into this issue Chenqin. To me this looks like a bug in
Flink. I am not entirely sure but somehow the wrapping order might have
been changed when using files systems from the plugin system. Maybe Arvid
knows about any changes in this area. I think we should open a JIRA ticket
for this problem.

Cheers,
Till

On Wed, Mar 24, 2021 at 3:56 AM chenqin <[hidden email]> wrote:

> Also noticed the actual states stored in _metadata still contains entropy
> marker after we fix metadata directory issue. This issue seems related to
> code refactory as well as doesn't conveyed in tests.
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Till Rohrmann
There is indeed a ticket which tried to fix it for 1.11. release [1, 2].
Maybe the fix is not working properly.

[1] https://issues.apache.org/jira/browse/FLINK-17359
[2] https://github.com/apache/flink/pull/11891

On Wed, Mar 24, 2021 at 12:08 PM Till Rohrmann <[hidden email]> wrote:

> Thanks for looking into this issue Chenqin. To me this looks like a bug in
> Flink. I am not entirely sure but somehow the wrapping order might have
> been changed when using files systems from the plugin system. Maybe Arvid
> knows about any changes in this area. I think we should open a JIRA ticket
> for this problem.
>
> Cheers,
> Till
>
> On Wed, Mar 24, 2021 at 3:56 AM chenqin <[hidden email]> wrote:
>
>> Also noticed the actual states stored in _metadata still contains entropy
>> marker after we fix metadata directory issue. This issue seems related to
>> code refactory as well as doesn't conveyed in tests.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

chenqin
link fix pr here https://github.com/apache/flink/pull/15442
we might need someone help review and merge meanwhile.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

Till Rohrmann
Thanks for creating this PR. I think it would be good to re-open the issue
and post your analysis there together with the proposal for the fix.

Cheers,
Till

On Wed, Mar 31, 2021 at 3:41 AM chenqin <[hidden email]> wrote:

> link fix pr here https://github.com/apache/flink/pull/15442
> we might need someone help review and merge meanwhile.
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink job cannot find recover path after using entropy injection for s3 file systems

chenqin
Friendly ping, the fix for entropy marker is ready.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/