Dear Flink community:
We have a use case where StreamingFileSink <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html> is used for persisting bulk-encoded data to AWS s3. In our case, the data sources consist of hybrid types of events, for which each type is uploaded to an individual s3 prefix location. Because the event size is highly skewed, the uploaded file size may differ dramatically. In order to have a better control over the uploaded file size, we would like to adopt a rolling policy based on file sizes (e.g., roll the file every 100MB). Yet it appears bulk-encoding StreamingFileSink only supports checkpoint-based file rolling. IMPORTANT: Bulk-encoding formats can only be combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every checkpoint. Checkpoint-based file rolling appears to have other side effects. For instance, quite a lot of the heavy liftings (e.g file parts uploading) are performed at the checkpointing time. As a result, checkpointing takes longer duration when data volume is high. Having a customized file rolling policy can be achieved by small adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the case of using S3RecoverableWriter, file rolling triggers data uploading and corresponding S3Committer is also constructed and stored. Hence on the surface, adding a simple file-size based rolling policy would NOT compromise the established exact-once guarantee. Any advises on whether the above idea makes sense? Or perhaps there are pitfalls that one might pay attention when introducing such rolling policy. Thanks a lot! - Ying |
Hi Ying,
Thanks for using the StreamingFileSink. The reason why the StreamingFileSink only supports OnCheckpointRollingPolicy with bulk formats has to do with the fact that currently Flink relies on the Hadoop writer for Parquet. Bulk formats keep important details about how they write the actual data (such as compression schemes, offsets, etc) in metadata and they write this metadata with the file (e.g. parquet writes them as a footer). The hadoop writer gives no access to these metadata. Given this, there is no way for flink to be able to checkpoint a part file securely without closing it. The solution would be to write our own writer and not go through the hadoop one, but there are no concrete plans for this, as far as I know. I hope this explains a bit more why the StreamingFileSink has this limitation. Cheers, Kostas On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote: > Dear Flink community: > > We have a use case where StreamingFileSink > < > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > is used for persisting bulk-encoded data to AWS s3. In our case, the data > sources consist of hybrid types of events, for which each type is uploaded > to an individual s3 prefix location. Because the event size is highly > skewed, the uploaded file size may differ dramatically. In order to have a > better control over the uploaded file size, we would like to adopt a > rolling policy based on file sizes (e.g., roll the file every 100MB). Yet > it appears bulk-encoding StreamingFileSink only supports checkpoint-based > file rolling. > > IMPORTANT: Bulk-encoding formats can only be combined with the > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every > checkpoint. > > Checkpoint-based file rolling appears to have other side effects. For > instance, quite a lot of the heavy liftings (e.g file parts uploading) are > performed at the checkpointing time. As a result, checkpointing takes > longer duration when data volume is high. > > Having a customized file rolling policy can be achieved by small > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the > case of using S3RecoverableWriter, file rolling triggers data uploading and > corresponding S3Committer is also constructed and stored. Hence on the > surface, adding a simple file-size based rolling policy would NOT > compromise the established exact-once guarantee. > > Any advises on whether the above idea makes sense? Or perhaps there are > pitfalls that one might pay attention when introducing such rolling policy. > Thanks a lot! > > > - > Ying > |
HI Kostas:
Thanks for the prompt reply. The file rolling policy mentioned previously is meant to roll files EITHER when a size limited is reached, OR when a checkpoint happens. Looks like every time a file is rolled, the part file is closed <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218>, during which file is closed with a committable returned <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240>. I assume it is during closeForCommit() when the Parquet file metatdata is written. At a first glance, the code path of file rolling looks very similar to that inside prepareBucketForCheckpointing() <https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275>. Not sure if I miss anything there. - Ying On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> wrote: > Hi Ying, > > Thanks for using the StreamingFileSink. > > The reason why the StreamingFileSink only supports > OnCheckpointRollingPolicy with bulk > formats has to do with the fact that currently Flink relies on the Hadoop > writer for Parquet. > > Bulk formats keep important details about how they write the actual data > (such as compression > schemes, offsets, etc) in metadata and they write this metadata with the > file (e.g. parquet writes > them as a footer). The hadoop writer gives no access to these metadata. > Given this, there is > no way for flink to be able to checkpoint a part file securely without > closing it. > > The solution would be to write our own writer and not go through the hadoop > one, but there > are no concrete plans for this, as far as I know. > > I hope this explains a bit more why the StreamingFileSink has this > limitation. > > Cheers, > Kostas > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote: > > > Dear Flink community: > > > > We have a use case where StreamingFileSink > > < > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, the data > > sources consist of hybrid types of events, for which each type is > uploaded > > to an individual s3 prefix location. Because the event size is highly > > skewed, the uploaded file size may differ dramatically. In order to > have a > > better control over the uploaded file size, we would like to adopt a > > rolling policy based on file sizes (e.g., roll the file every 100MB). Yet > > it appears bulk-encoding StreamingFileSink only supports checkpoint-based > > file rolling. > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on > every > > checkpoint. > > > > Checkpoint-based file rolling appears to have other side effects. For > > instance, quite a lot of the heavy liftings (e.g file parts uploading) > are > > performed at the checkpointing time. As a result, checkpointing takes > > longer duration when data volume is high. > > > > Having a customized file rolling policy can be achieved by small > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In > the > > case of using S3RecoverableWriter, file rolling triggers data uploading > and > > corresponding S3Committer is also constructed and stored. Hence on the > > surface, adding a simple file-size based rolling policy would NOT > > compromise the established exact-once guarantee. > > > > Any advises on whether the above idea makes sense? Or perhaps there are > > pitfalls that one might pay attention when introducing such rolling > policy. > > Thanks a lot! > > > > > > - > > Ying > > > |
Hi Ying,
You are right! If it is either on checkpoint or on size, then this is doable even with the current state of things. Could you open a JIRA so that we can keep track of the progress? Cheers, Kostas On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote: > HI Kostas: > > Thanks for the prompt reply. > > The file rolling policy mentioned previously is meant to roll files EITHER > when a size limited is reached, OR when a checkpoint happens. Looks like > every time a file is rolled, the part file is closed > < > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > >, > during which file is closed with a committable returned > < > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > >. > I assume it is during closeForCommit() when the Parquet file metatdata is > written. At a first glance, the code path of file rolling looks very > similar to that inside prepareBucketForCheckpointing() > < > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > >. > Not sure if I miss anything there. > > > - > Ying > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> wrote: > > > Hi Ying, > > > > Thanks for using the StreamingFileSink. > > > > The reason why the StreamingFileSink only supports > > OnCheckpointRollingPolicy with bulk > > formats has to do with the fact that currently Flink relies on the Hadoop > > writer for Parquet. > > > > Bulk formats keep important details about how they write the actual data > > (such as compression > > schemes, offsets, etc) in metadata and they write this metadata with the > > file (e.g. parquet writes > > them as a footer). The hadoop writer gives no access to these metadata. > > Given this, there is > > no way for flink to be able to checkpoint a part file securely without > > closing it. > > > > The solution would be to write our own writer and not go through the > hadoop > > one, but there > > are no concrete plans for this, as far as I know. > > > > I hope this explains a bit more why the StreamingFileSink has this > > limitation. > > > > Cheers, > > Kostas > > > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote: > > > > > Dear Flink community: > > > > > > We have a use case where StreamingFileSink > > > < > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, the > data > > > sources consist of hybrid types of events, for which each type is > > uploaded > > > to an individual s3 prefix location. Because the event size is highly > > > skewed, the uploaded file size may differ dramatically. In order to > > have a > > > better control over the uploaded file size, we would like to adopt a > > > rolling policy based on file sizes (e.g., roll the file every 100MB). > Yet > > > it appears bulk-encoding StreamingFileSink only supports > checkpoint-based > > > file rolling. > > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on > > every > > > checkpoint. > > > > > > Checkpoint-based file rolling appears to have other side effects. For > > > instance, quite a lot of the heavy liftings (e.g file parts uploading) > > are > > > performed at the checkpointing time. As a result, checkpointing takes > > > longer duration when data volume is high. > > > > > > Having a customized file rolling policy can be achieved by small > > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In > > the > > > case of using S3RecoverableWriter, file rolling triggers data uploading > > and > > > corresponding S3Committer is also constructed and stored. Hence on the > > > surface, adding a simple file-size based rolling policy would NOT > > > compromise the established exact-once guarantee. > > > > > > Any advises on whether the above idea makes sense? Or perhaps there are > > > pitfalls that one might pay attention when introducing such rolling > > policy. > > > Thanks a lot! > > > > > > > > > - > > > Ying > > > > > > |
Thanks Kostas for confirming!
I've filed a issue FLINK-13027 <https://issues.apache.org/jira/browse/FLINK-13027> . We are actively working on the interface of such a file rolling policy, and will also perform benchmarks when it is integrated with a StreamingFileSink. We are more than happy to contribute if there's no other plan to address this issue. Thanks again. - Bests Ying On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> wrote: > Hi Ying, > > You are right! If it is either on checkpoint or on size, then this is > doable even with the current state of things. > Could you open a JIRA so that we can keep track of the progress? > > Cheers, > Kostas > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote: > > > HI Kostas: > > > > Thanks for the prompt reply. > > > > The file rolling policy mentioned previously is meant to roll files > EITHER > > when a size limited is reached, OR when a checkpoint happens. Looks like > > every time a file is rolled, the part file is closed > > < > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > > >, > > during which file is closed with a committable returned > > < > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > > >. > > I assume it is during closeForCommit() when the Parquet file metatdata is > > written. At a first glance, the code path of file rolling looks very > > similar to that inside prepareBucketForCheckpointing() > > < > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > > >. > > Not sure if I miss anything there. > > > > > > - > > Ying > > > > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> > wrote: > > > > > Hi Ying, > > > > > > Thanks for using the StreamingFileSink. > > > > > > The reason why the StreamingFileSink only supports > > > OnCheckpointRollingPolicy with bulk > > > formats has to do with the fact that currently Flink relies on the > Hadoop > > > writer for Parquet. > > > > > > Bulk formats keep important details about how they write the actual > data > > > (such as compression > > > schemes, offsets, etc) in metadata and they write this metadata with > the > > > file (e.g. parquet writes > > > them as a footer). The hadoop writer gives no access to these metadata. > > > Given this, there is > > > no way for flink to be able to checkpoint a part file securely without > > > closing it. > > > > > > The solution would be to write our own writer and not go through the > > hadoop > > > one, but there > > > are no concrete plans for this, as far as I know. > > > > > > I hope this explains a bit more why the StreamingFileSink has this > > > limitation. > > > > > > Cheers, > > > Kostas > > > > > > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote: > > > > > > > Dear Flink community: > > > > > > > > We have a use case where StreamingFileSink > > > > < > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, the > > data > > > > sources consist of hybrid types of events, for which each type is > > > uploaded > > > > to an individual s3 prefix location. Because the event size is highly > > > > skewed, the uploaded file size may differ dramatically. In order to > > > have a > > > > better control over the uploaded file size, we would like to adopt a > > > > rolling policy based on file sizes (e.g., roll the file every 100MB). > > Yet > > > > it appears bulk-encoding StreamingFileSink only supports > > checkpoint-based > > > > file rolling. > > > > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on > > > every > > > > checkpoint. > > > > > > > > Checkpoint-based file rolling appears to have other side effects. For > > > > instance, quite a lot of the heavy liftings (e.g file parts > uploading) > > > are > > > > performed at the checkpointing time. As a result, checkpointing takes > > > > longer duration when data volume is high. > > > > > > > > Having a customized file rolling policy can be achieved by small > > > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. > In > > > the > > > > case of using S3RecoverableWriter, file rolling triggers data > uploading > > > and > > > > corresponding S3Committer is also constructed and stored. Hence on > the > > > > surface, adding a simple file-size based rolling policy would NOT > > > > compromise the established exact-once guarantee. > > > > > > > > Any advises on whether the above idea makes sense? Or perhaps there > are > > > > pitfalls that one might pay attention when introducing such rolling > > > policy. > > > > Thanks a lot! > > > > > > > > > > > > - > > > > Ying > > > > > > > > > > |
Hi Ying,
That sounds great! Looking forward to your PR! Btw don't you want to assign the issue to yourself if you are planning to work on it? Kostas On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote: > Thanks Kostas for confirming! > > I've filed a issue FLINK-13027 > <https://issues.apache.org/jira/browse/FLINK-13027> . We are actively > working on the interface of such a file rolling policy, and will also > perform benchmarks when it is integrated with a StreamingFileSink. We are > more than happy to contribute if there's no other plan to address this > issue. > > Thanks again. > > - > Bests > Ying > > > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> wrote: > > > Hi Ying, > > > > You are right! If it is either on checkpoint or on size, then this is > > doable even with the current state of things. > > Could you open a JIRA so that we can keep track of the progress? > > > > Cheers, > > Kostas > > > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote: > > > > > HI Kostas: > > > > > > Thanks for the prompt reply. > > > > > > The file rolling policy mentioned previously is meant to roll files > > EITHER > > > when a size limited is reached, OR when a checkpoint happens. Looks > like > > > every time a file is rolled, the part file is closed > > > < > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > > > >, > > > during which file is closed with a committable returned > > > < > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > > > >. > > > I assume it is during closeForCommit() when the Parquet file metatdata > is > > > written. At a first glance, the code path of file rolling looks very > > > similar to that inside prepareBucketForCheckpointing() > > > < > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > > > >. > > > Not sure if I miss anything there. > > > > > > > > > - > > > Ying > > > > > > > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> > > wrote: > > > > > > > Hi Ying, > > > > > > > > Thanks for using the StreamingFileSink. > > > > > > > > The reason why the StreamingFileSink only supports > > > > OnCheckpointRollingPolicy with bulk > > > > formats has to do with the fact that currently Flink relies on the > > Hadoop > > > > writer for Parquet. > > > > > > > > Bulk formats keep important details about how they write the actual > > data > > > > (such as compression > > > > schemes, offsets, etc) in metadata and they write this metadata with > > the > > > > file (e.g. parquet writes > > > > them as a footer). The hadoop writer gives no access to these > metadata. > > > > Given this, there is > > > > no way for flink to be able to checkpoint a part file securely > without > > > > closing it. > > > > > > > > The solution would be to write our own writer and not go through the > > > hadoop > > > > one, but there > > > > are no concrete plans for this, as far as I know. > > > > > > > > I hope this explains a bit more why the StreamingFileSink has this > > > > limitation. > > > > > > > > Cheers, > > > > Kostas > > > > > > > > > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> > wrote: > > > > > > > > > Dear Flink community: > > > > > > > > > > We have a use case where StreamingFileSink > > > > > < > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > > > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, > the > > > data > > > > > sources consist of hybrid types of events, for which each type is > > > > uploaded > > > > > to an individual s3 prefix location. Because the event size is > highly > > > > > skewed, the uploaded file size may differ dramatically. In order > to > > > > have a > > > > > better control over the uploaded file size, we would like to adopt > a > > > > > rolling policy based on file sizes (e.g., roll the file every > 100MB). > > > Yet > > > > > it appears bulk-encoding StreamingFileSink only supports > > > checkpoint-based > > > > > file rolling. > > > > > > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file > on > > > > every > > > > > checkpoint. > > > > > > > > > > Checkpoint-based file rolling appears to have other side effects. > For > > > > > instance, quite a lot of the heavy liftings (e.g file parts > > uploading) > > > > are > > > > > performed at the checkpointing time. As a result, checkpointing > takes > > > > > longer duration when data volume is high. > > > > > > > > > > Having a customized file rolling policy can be achieved by small > > > > > adjustments on the BulkFormatBuilder interface in > StreamingFileSink. > > In > > > > the > > > > > case of using S3RecoverableWriter, file rolling triggers data > > uploading > > > > and > > > > > corresponding S3Committer is also constructed and stored. Hence on > > the > > > > > surface, adding a simple file-size based rolling policy would NOT > > > > > compromise the established exact-once guarantee. > > > > > > > > > > Any advises on whether the above idea makes sense? Or perhaps there > > are > > > > > pitfalls that one might pay attention when introducing such rolling > > > > policy. > > > > > Thanks a lot! > > > > > > > > > > > > > > > - > > > > > Ying > > > > > > > > > > > > > > > |
Hi Kostas:
I'd like to. The account used to file the JIRA does not have contributor access yet . I had contributed a few Flink JIRAs in the past, using a very similar but different account. Now I would like to consolidate and use a common account for Apache projects contributions. Would you mind granting me the contributor access for the following account ? This way I can assign the JIRA to myself. *yxu-apache <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache>* Many thanks! <http://www.lyft.com/> - Ying On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]> wrote: > Hi Ying, > > That sounds great! > Looking forward to your PR! > > Btw don't you want to assign the issue to yourself if you are > planning to work on it? > > Kostas > > On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote: > > > Thanks Kostas for confirming! > > > > I've filed a issue FLINK-13027 > > <https://issues.apache.org/jira/browse/FLINK-13027> . We are actively > > working on the interface of such a file rolling policy, and will also > > perform benchmarks when it is integrated with a StreamingFileSink. We are > > more than happy to contribute if there's no other plan to address this > > issue. > > > > Thanks again. > > > > - > > Bests > > Ying > > > > > > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> > wrote: > > > > > Hi Ying, > > > > > > You are right! If it is either on checkpoint or on size, then this is > > > doable even with the current state of things. > > > Could you open a JIRA so that we can keep track of the progress? > > > > > > Cheers, > > > Kostas > > > > > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote: > > > > > > > HI Kostas: > > > > > > > > Thanks for the prompt reply. > > > > > > > > The file rolling policy mentioned previously is meant to roll files > > > EITHER > > > > when a size limited is reached, OR when a checkpoint happens. Looks > > like > > > > every time a file is rolled, the part file is closed > > > > < > > > > > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > > > > >, > > > > during which file is closed with a committable returned > > > > < > > > > > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > > > > >. > > > > I assume it is during closeForCommit() when the Parquet file > metatdata > > is > > > > written. At a first glance, the code path of file rolling looks very > > > > similar to that inside prepareBucketForCheckpointing() > > > > < > > > > > > > > > > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > > > > >. > > > > Not sure if I miss anything there. > > > > > > > > > > > > - > > > > Ying > > > > > > > > > > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> > > > wrote: > > > > > > > > > Hi Ying, > > > > > > > > > > Thanks for using the StreamingFileSink. > > > > > > > > > > The reason why the StreamingFileSink only supports > > > > > OnCheckpointRollingPolicy with bulk > > > > > formats has to do with the fact that currently Flink relies on the > > > Hadoop > > > > > writer for Parquet. > > > > > > > > > > Bulk formats keep important details about how they write the actual > > > data > > > > > (such as compression > > > > > schemes, offsets, etc) in metadata and they write this metadata > with > > > the > > > > > file (e.g. parquet writes > > > > > them as a footer). The hadoop writer gives no access to these > > metadata. > > > > > Given this, there is > > > > > no way for flink to be able to checkpoint a part file securely > > without > > > > > closing it. > > > > > > > > > > The solution would be to write our own writer and not go through > the > > > > hadoop > > > > > one, but there > > > > > are no concrete plans for this, as far as I know. > > > > > > > > > > I hope this explains a bit more why the StreamingFileSink has this > > > > > limitation. > > > > > > > > > > Cheers, > > > > > Kostas > > > > > > > > > > > > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> > > wrote: > > > > > > > > > > > Dear Flink community: > > > > > > > > > > > > We have a use case where StreamingFileSink > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > > > > > > > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, > > the > > > > data > > > > > > sources consist of hybrid types of events, for which each type is > > > > > uploaded > > > > > > to an individual s3 prefix location. Because the event size is > > highly > > > > > > skewed, the uploaded file size may differ dramatically. In order > > to > > > > > have a > > > > > > better control over the uploaded file size, we would like to > adopt > > a > > > > > > rolling policy based on file sizes (e.g., roll the file every > > 100MB). > > > > Yet > > > > > > it appears bulk-encoding StreamingFileSink only supports > > > > checkpoint-based > > > > > > file rolling. > > > > > > > > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part > file > > on > > > > > every > > > > > > checkpoint. > > > > > > > > > > > > Checkpoint-based file rolling appears to have other side effects. > > For > > > > > > instance, quite a lot of the heavy liftings (e.g file parts > > > uploading) > > > > > are > > > > > > performed at the checkpointing time. As a result, checkpointing > > takes > > > > > > longer duration when data volume is high. > > > > > > > > > > > > Having a customized file rolling policy can be achieved by small > > > > > > adjustments on the BulkFormatBuilder interface in > > StreamingFileSink. > > > In > > > > > the > > > > > > case of using S3RecoverableWriter, file rolling triggers data > > > uploading > > > > > and > > > > > > corresponding S3Committer is also constructed and stored. Hence > on > > > the > > > > > > surface, adding a simple file-size based rolling policy would NOT > > > > > > compromise the established exact-once guarantee. > > > > > > > > > > > > Any advises on whether the above idea makes sense? Or perhaps > there > > > are > > > > > > pitfalls that one might pay attention when introducing such > rolling > > > > > policy. > > > > > > Thanks a lot! > > > > > > > > > > > > > > > > > > - > > > > > > Ying > > > > > > > > > > > > > > > > > > > > > |
Hi Kostas:
For simplicity FLINK-13027 <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to my current user ID. I will contribute using that ID. Will circulate with the community once we have initial success with this new rolling policy ! Thank you again. - Ying On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <[hidden email]> wrote: > Hi Kostas: > > I'd like to. The account used to file the JIRA does not have contributor > access yet . I had contributed a few Flink JIRAs in the past, using a very > similar but different account. Now I would like to consolidate and use a > common account for Apache projects contributions. > > Would you mind granting me the contributor access for the following > account ? This way I can assign the JIRA to myself. > *yxu-apache > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache>* > > Many thanks! > <http://www.lyft.com/> > - > Ying > > > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]> wrote: > >> Hi Ying, >> >> That sounds great! >> Looking forward to your PR! >> >> Btw don't you want to assign the issue to yourself if you are >> planning to work on it? >> >> Kostas >> >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote: >> >> > Thanks Kostas for confirming! >> > >> > I've filed a issue FLINK-13027 >> > <https://issues.apache.org/jira/browse/FLINK-13027> . We are actively >> > working on the interface of such a file rolling policy, and will also >> > perform benchmarks when it is integrated with a StreamingFileSink. We >> are >> > more than happy to contribute if there's no other plan to address this >> > issue. >> > >> > Thanks again. >> > >> > - >> > Bests >> > Ying >> > >> > >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> >> wrote: >> > >> > > Hi Ying, >> > > >> > > You are right! If it is either on checkpoint or on size, then this is >> > > doable even with the current state of things. >> > > Could you open a JIRA so that we can keep track of the progress? >> > > >> > > Cheers, >> > > Kostas >> > > >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote: >> > > >> > > > HI Kostas: >> > > > >> > > > Thanks for the prompt reply. >> > > > >> > > > The file rolling policy mentioned previously is meant to roll files >> > > EITHER >> > > > when a size limited is reached, OR when a checkpoint happens. Looks >> > like >> > > > every time a file is rolled, the part file is closed >> > > > < >> > > > >> > > >> > >> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 >> > > > >, >> > > > during which file is closed with a committable returned >> > > > < >> > > > >> > > >> > >> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 >> > > > >. >> > > > I assume it is during closeForCommit() when the Parquet file >> metatdata >> > is >> > > > written. At a first glance, the code path of file rolling looks >> very >> > > > similar to that inside prepareBucketForCheckpointing() >> > > > < >> > > > >> > > >> > >> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 >> > > > >. >> > > > Not sure if I miss anything there. >> > > > >> > > > >> > > > - >> > > > Ying >> > > > >> > > > >> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> >> > > wrote: >> > > > >> > > > > Hi Ying, >> > > > > >> > > > > Thanks for using the StreamingFileSink. >> > > > > >> > > > > The reason why the StreamingFileSink only supports >> > > > > OnCheckpointRollingPolicy with bulk >> > > > > formats has to do with the fact that currently Flink relies on the >> > > Hadoop >> > > > > writer for Parquet. >> > > > > >> > > > > Bulk formats keep important details about how they write the >> actual >> > > data >> > > > > (such as compression >> > > > > schemes, offsets, etc) in metadata and they write this metadata >> with >> > > the >> > > > > file (e.g. parquet writes >> > > > > them as a footer). The hadoop writer gives no access to these >> > metadata. >> > > > > Given this, there is >> > > > > no way for flink to be able to checkpoint a part file securely >> > without >> > > > > closing it. >> > > > > >> > > > > The solution would be to write our own writer and not go through >> the >> > > > hadoop >> > > > > one, but there >> > > > > are no concrete plans for this, as far as I know. >> > > > > >> > > > > I hope this explains a bit more why the StreamingFileSink has this >> > > > > limitation. >> > > > > >> > > > > Cheers, >> > > > > Kostas >> > > > > >> > > > > >> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> >> > wrote: >> > > > > >> > > > > > Dear Flink community: >> > > > > > >> > > > > > We have a use case where StreamingFileSink >> > > > > > < >> > > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >> > > > > > > >> > > > > > is used for persisting bulk-encoded data to AWS s3. In our case, >> > the >> > > > data >> > > > > > sources consist of hybrid types of events, for which each type >> is >> > > > > uploaded >> > > > > > to an individual s3 prefix location. Because the event size is >> > highly >> > > > > > skewed, the uploaded file size may differ dramatically. In >> order >> > to >> > > > > have a >> > > > > > better control over the uploaded file size, we would like to >> adopt >> > a >> > > > > > rolling policy based on file sizes (e.g., roll the file every >> > 100MB). >> > > > Yet >> > > > > > it appears bulk-encoding StreamingFileSink only supports >> > > > checkpoint-based >> > > > > > file rolling. >> > > > > > >> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the >> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part >> file >> > on >> > > > > every >> > > > > > checkpoint. >> > > > > > >> > > > > > Checkpoint-based file rolling appears to have other side >> effects. >> > For >> > > > > > instance, quite a lot of the heavy liftings (e.g file parts >> > > uploading) >> > > > > are >> > > > > > performed at the checkpointing time. As a result, checkpointing >> > takes >> > > > > > longer duration when data volume is high. >> > > > > > >> > > > > > Having a customized file rolling policy can be achieved by small >> > > > > > adjustments on the BulkFormatBuilder interface in >> > StreamingFileSink. >> > > In >> > > > > the >> > > > > > case of using S3RecoverableWriter, file rolling triggers data >> > > uploading >> > > > > and >> > > > > > corresponding S3Committer is also constructed and stored. Hence >> on >> > > the >> > > > > > surface, adding a simple file-size based rolling policy would >> NOT >> > > > > > compromise the established exact-once guarantee. >> > > > > > >> > > > > > Any advises on whether the above idea makes sense? Or perhaps >> there >> > > are >> > > > > > pitfalls that one might pay attention when introducing such >> rolling >> > > > > policy. >> > > > > > Thanks a lot! >> > > > > > >> > > > > > >> > > > > > - >> > > > > > Ying >> > > > > > >> > > > > >> > > > >> > > >> > >> > |
Thanks Ying!
Looking forward to your contribution. Kostas On Wed, Jul 3, 2019 at 6:48 PM Ying Xu <[hidden email]> wrote: > Hi Kostas: > > For simplicity FLINK-13027 > <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to > my > current user ID. I will contribute using that ID. > > Will circulate with the community once we have initial success with this > new rolling policy ! > > Thank you again. > > - > Ying > > > On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <[hidden email]> wrote: > > > Hi Kostas: > > > > I'd like to. The account used to file the JIRA does not have contributor > > access yet . I had contributed a few Flink JIRAs in the past, using a > very > > similar but different account. Now I would like to consolidate and use a > > common account for Apache projects contributions. > > > > Would you mind granting me the contributor access for the following > > account ? This way I can assign the JIRA to myself. > > *yxu-apache > > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache > >* > > > > Many thanks! > > <http://www.lyft.com/> > > - > > Ying > > > > > > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]> > wrote: > > > >> Hi Ying, > >> > >> That sounds great! > >> Looking forward to your PR! > >> > >> Btw don't you want to assign the issue to yourself if you are > >> planning to work on it? > >> > >> Kostas > >> > >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote: > >> > >> > Thanks Kostas for confirming! > >> > > >> > I've filed a issue FLINK-13027 > >> > <https://issues.apache.org/jira/browse/FLINK-13027> . We are > actively > >> > working on the interface of such a file rolling policy, and will also > >> > perform benchmarks when it is integrated with a StreamingFileSink. We > >> are > >> > more than happy to contribute if there's no other plan to address this > >> > issue. > >> > > >> > Thanks again. > >> > > >> > - > >> > Bests > >> > Ying > >> > > >> > > >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> > >> wrote: > >> > > >> > > Hi Ying, > >> > > > >> > > You are right! If it is either on checkpoint or on size, then this > is > >> > > doable even with the current state of things. > >> > > Could you open a JIRA so that we can keep track of the progress? > >> > > > >> > > Cheers, > >> > > Kostas > >> > > > >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> > wrote: > >> > > > >> > > > HI Kostas: > >> > > > > >> > > > Thanks for the prompt reply. > >> > > > > >> > > > The file rolling policy mentioned previously is meant to roll > files > >> > > EITHER > >> > > > when a size limited is reached, OR when a checkpoint happens. > Looks > >> > like > >> > > > every time a file is rolled, the part file is closed > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218 > >> > > > >, > >> > > > during which file is closed with a committable returned > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240 > >> > > > >. > >> > > > I assume it is during closeForCommit() when the Parquet file > >> metatdata > >> > is > >> > > > written. At a first glance, the code path of file rolling looks > >> very > >> > > > similar to that inside prepareBucketForCheckpointing() > >> > > > < > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275 > >> > > > >. > >> > > > Not sure if I miss anything there. > >> > > > > >> > > > > >> > > > - > >> > > > Ying > >> > > > > >> > > > > >> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas < > [hidden email]> > >> > > wrote: > >> > > > > >> > > > > Hi Ying, > >> > > > > > >> > > > > Thanks for using the StreamingFileSink. > >> > > > > > >> > > > > The reason why the StreamingFileSink only supports > >> > > > > OnCheckpointRollingPolicy with bulk > >> > > > > formats has to do with the fact that currently Flink relies on > the > >> > > Hadoop > >> > > > > writer for Parquet. > >> > > > > > >> > > > > Bulk formats keep important details about how they write the > >> actual > >> > > data > >> > > > > (such as compression > >> > > > > schemes, offsets, etc) in metadata and they write this metadata > >> with > >> > > the > >> > > > > file (e.g. parquet writes > >> > > > > them as a footer). The hadoop writer gives no access to these > >> > metadata. > >> > > > > Given this, there is > >> > > > > no way for flink to be able to checkpoint a part file securely > >> > without > >> > > > > closing it. > >> > > > > > >> > > > > The solution would be to write our own writer and not go through > >> the > >> > > > hadoop > >> > > > > one, but there > >> > > > > are no concrete plans for this, as far as I know. > >> > > > > > >> > > > > I hope this explains a bit more why the StreamingFileSink has > this > >> > > > > limitation. > >> > > > > > >> > > > > Cheers, > >> > > > > Kostas > >> > > > > > >> > > > > > >> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> > >> > wrote: > >> > > > > > >> > > > > > Dear Flink community: > >> > > > > > > >> > > > > > We have a use case where StreamingFileSink > >> > > > > > < > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > >> > > > > > > > >> > > > > > is used for persisting bulk-encoded data to AWS s3. In our > case, > >> > the > >> > > > data > >> > > > > > sources consist of hybrid types of events, for which each type > >> is > >> > > > > uploaded > >> > > > > > to an individual s3 prefix location. Because the event size is > >> > highly > >> > > > > > skewed, the uploaded file size may differ dramatically. In > >> order > >> > to > >> > > > > have a > >> > > > > > better control over the uploaded file size, we would like to > >> adopt > >> > a > >> > > > > > rolling policy based on file sizes (e.g., roll the file every > >> > 100MB). > >> > > > Yet > >> > > > > > it appears bulk-encoding StreamingFileSink only supports > >> > > > checkpoint-based > >> > > > > > file rolling. > >> > > > > > > >> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the > >> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part > >> file > >> > on > >> > > > > every > >> > > > > > checkpoint. > >> > > > > > > >> > > > > > Checkpoint-based file rolling appears to have other side > >> effects. > >> > For > >> > > > > > instance, quite a lot of the heavy liftings (e.g file parts > >> > > uploading) > >> > > > > are > >> > > > > > performed at the checkpointing time. As a result, > checkpointing > >> > takes > >> > > > > > longer duration when data volume is high. > >> > > > > > > >> > > > > > Having a customized file rolling policy can be achieved by > small > >> > > > > > adjustments on the BulkFormatBuilder interface in > >> > StreamingFileSink. > >> > > In > >> > > > > the > >> > > > > > case of using S3RecoverableWriter, file rolling triggers data > >> > > uploading > >> > > > > and > >> > > > > > corresponding S3Committer is also constructed and stored. > Hence > >> on > >> > > the > >> > > > > > surface, adding a simple file-size based rolling policy would > >> NOT > >> > > > > > compromise the established exact-once guarantee. > >> > > > > > > >> > > > > > Any advises on whether the above idea makes sense? Or perhaps > >> there > >> > > are > >> > > > > > pitfalls that one might pay attention when introducing such > >> rolling > >> > > > > policy. > >> > > > > > Thanks a lot! > >> > > > > > > >> > > > > > > >> > > > > > - > >> > > > > > Ying > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > |
Free forum by Nabble | Edit this page |