Rolling policy when using StreamingFileSink for bulk-encoded output

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Rolling policy when using StreamingFileSink for bulk-encoded output

Ying Xu-2
Dear Flink community:

We have a use case where StreamingFileSink
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
is used for persisting bulk-encoded data to AWS s3. In our case, the data
sources consist of hybrid types of events, for which each type is uploaded
to an individual s3 prefix location. Because the event size is highly
skewed, the uploaded file size may differ dramatically.  In order to have a
better control over the uploaded file size, we would like to adopt a
rolling policy based on file sizes (e.g., roll the file every 100MB). Yet
it appears bulk-encoding StreamingFileSink only supports checkpoint-based
file rolling.

IMPORTANT: Bulk-encoding formats can only be combined with the
`OnCheckpointRollingPolicy`, which rolls the in-progress part file on every
checkpoint.

Checkpoint-based file rolling appears to have other side effects. For
instance, quite a lot of the heavy liftings (e.g file parts uploading) are
performed at the checkpointing time. As a result, checkpointing takes
longer duration when data volume is high.

Having a customized file rolling policy can be achieved by small
adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the
case of using S3RecoverableWriter, file rolling triggers data uploading and
corresponding S3Committer is also constructed and stored. Hence on the
surface, adding a simple file-size based rolling policy would NOT
compromise the established exact-once guarantee.

Any advises on whether the above idea makes sense? Or perhaps there are
pitfalls that one might pay attention when introducing such rolling policy.
Thanks a lot!


-
Ying
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Kostas Kloudas-4
Hi Ying,

Thanks for using the StreamingFileSink.

The reason why the StreamingFileSink only supports
OnCheckpointRollingPolicy with bulk
formats has to do with the fact that currently Flink relies on the Hadoop
writer for Parquet.

Bulk formats keep important details about how they write the actual data
(such as compression
schemes, offsets, etc) in metadata and they write this metadata with the
file (e.g. parquet writes
them as a footer). The hadoop writer gives no access to these metadata.
Given this, there is
no way for flink to be able to checkpoint a part file securely without
closing it.

The solution would be to write our own writer and not go through the hadoop
one, but there
are no concrete plans for this, as far as I know.

I hope this explains a bit more why the StreamingFileSink has this
limitation.

Cheers,
Kostas


On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote:

> Dear Flink community:
>
> We have a use case where StreamingFileSink
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
> is used for persisting bulk-encoded data to AWS s3. In our case, the data
> sources consist of hybrid types of events, for which each type is uploaded
> to an individual s3 prefix location. Because the event size is highly
> skewed, the uploaded file size may differ dramatically.  In order to have a
> better control over the uploaded file size, we would like to adopt a
> rolling policy based on file sizes (e.g., roll the file every 100MB). Yet
> it appears bulk-encoding StreamingFileSink only supports checkpoint-based
> file rolling.
>
> IMPORTANT: Bulk-encoding formats can only be combined with the
> `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every
> checkpoint.
>
> Checkpoint-based file rolling appears to have other side effects. For
> instance, quite a lot of the heavy liftings (e.g file parts uploading) are
> performed at the checkpointing time. As a result, checkpointing takes
> longer duration when data volume is high.
>
> Having a customized file rolling policy can be achieved by small
> adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the
> case of using S3RecoverableWriter, file rolling triggers data uploading and
> corresponding S3Committer is also constructed and stored. Hence on the
> surface, adding a simple file-size based rolling policy would NOT
> compromise the established exact-once guarantee.
>
> Any advises on whether the above idea makes sense? Or perhaps there are
> pitfalls that one might pay attention when introducing such rolling policy.
> Thanks a lot!
>
>
> -
> Ying
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Ying Xu-2
HI Kostas:

Thanks for the prompt reply.

The file rolling policy mentioned previously is meant to roll files EITHER
when a size limited is reached, OR when a checkpoint happens.  Looks like
every time a file is rolled, the part file is closed
<https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218>,
during which file is closed with a committable returned
<https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240>.
I assume it is during closeForCommit() when the Parquet file metatdata is
written.  At a first glance, the code path of file rolling looks very
similar to that inside prepareBucketForCheckpointing()
<https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275>.
Not sure if I miss anything there.


-
Ying


On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> wrote:

> Hi Ying,
>
> Thanks for using the StreamingFileSink.
>
> The reason why the StreamingFileSink only supports
> OnCheckpointRollingPolicy with bulk
> formats has to do with the fact that currently Flink relies on the Hadoop
> writer for Parquet.
>
> Bulk formats keep important details about how they write the actual data
> (such as compression
> schemes, offsets, etc) in metadata and they write this metadata with the
> file (e.g. parquet writes
> them as a footer). The hadoop writer gives no access to these metadata.
> Given this, there is
> no way for flink to be able to checkpoint a part file securely without
> closing it.
>
> The solution would be to write our own writer and not go through the hadoop
> one, but there
> are no concrete plans for this, as far as I know.
>
> I hope this explains a bit more why the StreamingFileSink has this
> limitation.
>
> Cheers,
> Kostas
>
>
> On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote:
>
> > Dear Flink community:
> >
> > We have a use case where StreamingFileSink
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > >
> > is used for persisting bulk-encoded data to AWS s3. In our case, the data
> > sources consist of hybrid types of events, for which each type is
> uploaded
> > to an individual s3 prefix location. Because the event size is highly
> > skewed, the uploaded file size may differ dramatically.  In order to
> have a
> > better control over the uploaded file size, we would like to adopt a
> > rolling policy based on file sizes (e.g., roll the file every 100MB). Yet
> > it appears bulk-encoding StreamingFileSink only supports checkpoint-based
> > file rolling.
> >
> > IMPORTANT: Bulk-encoding formats can only be combined with the
> > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
> every
> > checkpoint.
> >
> > Checkpoint-based file rolling appears to have other side effects. For
> > instance, quite a lot of the heavy liftings (e.g file parts uploading)
> are
> > performed at the checkpointing time. As a result, checkpointing takes
> > longer duration when data volume is high.
> >
> > Having a customized file rolling policy can be achieved by small
> > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In
> the
> > case of using S3RecoverableWriter, file rolling triggers data uploading
> and
> > corresponding S3Committer is also constructed and stored. Hence on the
> > surface, adding a simple file-size based rolling policy would NOT
> > compromise the established exact-once guarantee.
> >
> > Any advises on whether the above idea makes sense? Or perhaps there are
> > pitfalls that one might pay attention when introducing such rolling
> policy.
> > Thanks a lot!
> >
> >
> > -
> > Ying
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Kostas Kloudas-4
Hi Ying,

You are right! If it is either on checkpoint or on size, then this is
doable even with the current state of things.
Could you open a JIRA so that we can keep track of the progress?

Cheers,
Kostas

On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote:

> HI Kostas:
>
> Thanks for the prompt reply.
>
> The file rolling policy mentioned previously is meant to roll files EITHER
> when a size limited is reached, OR when a checkpoint happens.  Looks like
> every time a file is rolled, the part file is closed
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >,
> during which file is closed with a committable returned
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >.
> I assume it is during closeForCommit() when the Parquet file metatdata is
> written.  At a first glance, the code path of file rolling looks very
> similar to that inside prepareBucketForCheckpointing()
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> >.
> Not sure if I miss anything there.
>
>
> -
> Ying
>
>
> On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]> wrote:
>
> > Hi Ying,
> >
> > Thanks for using the StreamingFileSink.
> >
> > The reason why the StreamingFileSink only supports
> > OnCheckpointRollingPolicy with bulk
> > formats has to do with the fact that currently Flink relies on the Hadoop
> > writer for Parquet.
> >
> > Bulk formats keep important details about how they write the actual data
> > (such as compression
> > schemes, offsets, etc) in metadata and they write this metadata with the
> > file (e.g. parquet writes
> > them as a footer). The hadoop writer gives no access to these metadata.
> > Given this, there is
> > no way for flink to be able to checkpoint a part file securely without
> > closing it.
> >
> > The solution would be to write our own writer and not go through the
> hadoop
> > one, but there
> > are no concrete plans for this, as far as I know.
> >
> > I hope this explains a bit more why the StreamingFileSink has this
> > limitation.
> >
> > Cheers,
> > Kostas
> >
> >
> > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote:
> >
> > > Dear Flink community:
> > >
> > > We have a use case where StreamingFileSink
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > >
> > > is used for persisting bulk-encoded data to AWS s3. In our case, the
> data
> > > sources consist of hybrid types of events, for which each type is
> > uploaded
> > > to an individual s3 prefix location. Because the event size is highly
> > > skewed, the uploaded file size may differ dramatically.  In order to
> > have a
> > > better control over the uploaded file size, we would like to adopt a
> > > rolling policy based on file sizes (e.g., roll the file every 100MB).
> Yet
> > > it appears bulk-encoding StreamingFileSink only supports
> checkpoint-based
> > > file rolling.
> > >
> > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
> > every
> > > checkpoint.
> > >
> > > Checkpoint-based file rolling appears to have other side effects. For
> > > instance, quite a lot of the heavy liftings (e.g file parts uploading)
> > are
> > > performed at the checkpointing time. As a result, checkpointing takes
> > > longer duration when data volume is high.
> > >
> > > Having a customized file rolling policy can be achieved by small
> > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In
> > the
> > > case of using S3RecoverableWriter, file rolling triggers data uploading
> > and
> > > corresponding S3Committer is also constructed and stored. Hence on the
> > > surface, adding a simple file-size based rolling policy would NOT
> > > compromise the established exact-once guarantee.
> > >
> > > Any advises on whether the above idea makes sense? Or perhaps there are
> > > pitfalls that one might pay attention when introducing such rolling
> > policy.
> > > Thanks a lot!
> > >
> > >
> > > -
> > > Ying
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Ying Xu-2
Thanks Kostas for confirming!

I've filed a issue FLINK-13027
<https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
working on the interface of such a file rolling policy, and will also
perform benchmarks when it is integrated with a StreamingFileSink. We are
more than happy to contribute if there's no other plan to address this
issue.

Thanks again.

-
Bests
Ying


On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> wrote:

> Hi Ying,
>
> You are right! If it is either on checkpoint or on size, then this is
> doable even with the current state of things.
> Could you open a JIRA so that we can keep track of the progress?
>
> Cheers,
> Kostas
>
> On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote:
>
> > HI Kostas:
> >
> > Thanks for the prompt reply.
> >
> > The file rolling policy mentioned previously is meant to roll files
> EITHER
> > when a size limited is reached, OR when a checkpoint happens.  Looks like
> > every time a file is rolled, the part file is closed
> > <
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> > >,
> > during which file is closed with a committable returned
> > <
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> > >.
> > I assume it is during closeForCommit() when the Parquet file metatdata is
> > written.  At a first glance, the code path of file rolling looks very
> > similar to that inside prepareBucketForCheckpointing()
> > <
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> > >.
> > Not sure if I miss anything there.
> >
> >
> > -
> > Ying
> >
> >
> > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]>
> wrote:
> >
> > > Hi Ying,
> > >
> > > Thanks for using the StreamingFileSink.
> > >
> > > The reason why the StreamingFileSink only supports
> > > OnCheckpointRollingPolicy with bulk
> > > formats has to do with the fact that currently Flink relies on the
> Hadoop
> > > writer for Parquet.
> > >
> > > Bulk formats keep important details about how they write the actual
> data
> > > (such as compression
> > > schemes, offsets, etc) in metadata and they write this metadata with
> the
> > > file (e.g. parquet writes
> > > them as a footer). The hadoop writer gives no access to these metadata.
> > > Given this, there is
> > > no way for flink to be able to checkpoint a part file securely without
> > > closing it.
> > >
> > > The solution would be to write our own writer and not go through the
> > hadoop
> > > one, but there
> > > are no concrete plans for this, as far as I know.
> > >
> > > I hope this explains a bit more why the StreamingFileSink has this
> > > limitation.
> > >
> > > Cheers,
> > > Kostas
> > >
> > >
> > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]> wrote:
> > >
> > > > Dear Flink community:
> > > >
> > > > We have a use case where StreamingFileSink
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > > >
> > > > is used for persisting bulk-encoded data to AWS s3. In our case, the
> > data
> > > > sources consist of hybrid types of events, for which each type is
> > > uploaded
> > > > to an individual s3 prefix location. Because the event size is highly
> > > > skewed, the uploaded file size may differ dramatically.  In order to
> > > have a
> > > > better control over the uploaded file size, we would like to adopt a
> > > > rolling policy based on file sizes (e.g., roll the file every 100MB).
> > Yet
> > > > it appears bulk-encoding StreamingFileSink only supports
> > checkpoint-based
> > > > file rolling.
> > > >
> > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
> > > every
> > > > checkpoint.
> > > >
> > > > Checkpoint-based file rolling appears to have other side effects. For
> > > > instance, quite a lot of the heavy liftings (e.g file parts
> uploading)
> > > are
> > > > performed at the checkpointing time. As a result, checkpointing takes
> > > > longer duration when data volume is high.
> > > >
> > > > Having a customized file rolling policy can be achieved by small
> > > > adjustments on the BulkFormatBuilder interface in StreamingFileSink.
> In
> > > the
> > > > case of using S3RecoverableWriter, file rolling triggers data
> uploading
> > > and
> > > > corresponding S3Committer is also constructed and stored. Hence on
> the
> > > > surface, adding a simple file-size based rolling policy would NOT
> > > > compromise the established exact-once guarantee.
> > > >
> > > > Any advises on whether the above idea makes sense? Or perhaps there
> are
> > > > pitfalls that one might pay attention when introducing such rolling
> > > policy.
> > > > Thanks a lot!
> > > >
> > > >
> > > > -
> > > > Ying
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Kostas Kloudas-4
Hi Ying,

That sounds great!
Looking forward to your PR!

Btw don't you want to assign the issue to yourself if you are
planning to work on it?

Kostas

On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote:

> Thanks Kostas for confirming!
>
> I've filed a issue FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
> working on the interface of such a file rolling policy, and will also
> perform benchmarks when it is integrated with a StreamingFileSink. We are
> more than happy to contribute if there's no other plan to address this
> issue.
>
> Thanks again.
>
> -
> Bests
> Ying
>
>
> On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]> wrote:
>
> > Hi Ying,
> >
> > You are right! If it is either on checkpoint or on size, then this is
> > doable even with the current state of things.
> > Could you open a JIRA so that we can keep track of the progress?
> >
> > Cheers,
> > Kostas
> >
> > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote:
> >
> > > HI Kostas:
> > >
> > > Thanks for the prompt reply.
> > >
> > > The file rolling policy mentioned previously is meant to roll files
> > EITHER
> > > when a size limited is reached, OR when a checkpoint happens.  Looks
> like
> > > every time a file is rolled, the part file is closed
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> > > >,
> > > during which file is closed with a committable returned
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> > > >.
> > > I assume it is during closeForCommit() when the Parquet file metatdata
> is
> > > written.  At a first glance, the code path of file rolling looks very
> > > similar to that inside prepareBucketForCheckpointing()
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> > > >.
> > > Not sure if I miss anything there.
> > >
> > >
> > > -
> > > Ying
> > >
> > >
> > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]>
> > wrote:
> > >
> > > > Hi Ying,
> > > >
> > > > Thanks for using the StreamingFileSink.
> > > >
> > > > The reason why the StreamingFileSink only supports
> > > > OnCheckpointRollingPolicy with bulk
> > > > formats has to do with the fact that currently Flink relies on the
> > Hadoop
> > > > writer for Parquet.
> > > >
> > > > Bulk formats keep important details about how they write the actual
> > data
> > > > (such as compression
> > > > schemes, offsets, etc) in metadata and they write this metadata with
> > the
> > > > file (e.g. parquet writes
> > > > them as a footer). The hadoop writer gives no access to these
> metadata.
> > > > Given this, there is
> > > > no way for flink to be able to checkpoint a part file securely
> without
> > > > closing it.
> > > >
> > > > The solution would be to write our own writer and not go through the
> > > hadoop
> > > > one, but there
> > > > are no concrete plans for this, as far as I know.
> > > >
> > > > I hope this explains a bit more why the StreamingFileSink has this
> > > > limitation.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > >
> > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]>
> wrote:
> > > >
> > > > > Dear Flink community:
> > > > >
> > > > > We have a use case where StreamingFileSink
> > > > > <
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > > > >
> > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
> the
> > > data
> > > > > sources consist of hybrid types of events, for which each type is
> > > > uploaded
> > > > > to an individual s3 prefix location. Because the event size is
> highly
> > > > > skewed, the uploaded file size may differ dramatically.  In order
> to
> > > > have a
> > > > > better control over the uploaded file size, we would like to adopt
> a
> > > > > rolling policy based on file sizes (e.g., roll the file every
> 100MB).
> > > Yet
> > > > > it appears bulk-encoding StreamingFileSink only supports
> > > checkpoint-based
> > > > > file rolling.
> > > > >
> > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file
> on
> > > > every
> > > > > checkpoint.
> > > > >
> > > > > Checkpoint-based file rolling appears to have other side effects.
> For
> > > > > instance, quite a lot of the heavy liftings (e.g file parts
> > uploading)
> > > > are
> > > > > performed at the checkpointing time. As a result, checkpointing
> takes
> > > > > longer duration when data volume is high.
> > > > >
> > > > > Having a customized file rolling policy can be achieved by small
> > > > > adjustments on the BulkFormatBuilder interface in
> StreamingFileSink.
> > In
> > > > the
> > > > > case of using S3RecoverableWriter, file rolling triggers data
> > uploading
> > > > and
> > > > > corresponding S3Committer is also constructed and stored. Hence on
> > the
> > > > > surface, adding a simple file-size based rolling policy would NOT
> > > > > compromise the established exact-once guarantee.
> > > > >
> > > > > Any advises on whether the above idea makes sense? Or perhaps there
> > are
> > > > > pitfalls that one might pay attention when introducing such rolling
> > > > policy.
> > > > > Thanks a lot!
> > > > >
> > > > >
> > > > > -
> > > > > Ying
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Ying Xu-2
Hi Kostas:

I'd like to.  The account used to file the JIRA does not have contributor
access yet .  I had contributed a few Flink JIRAs in the past, using a very
similar but different account.  Now I would like to consolidate and use a
common account for Apache projects contributions.

Would you mind granting me the contributor access for the following account
?  This way I can assign the JIRA to myself.
           *yxu-apache
<https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache>*

Many thanks!
<http://www.lyft.com/>
-
Ying


On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]> wrote:

> Hi Ying,
>
> That sounds great!
> Looking forward to your PR!
>
> Btw don't you want to assign the issue to yourself if you are
> planning to work on it?
>
> Kostas
>
> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote:
>
> > Thanks Kostas for confirming!
> >
> > I've filed a issue FLINK-13027
> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
> > working on the interface of such a file rolling policy, and will also
> > perform benchmarks when it is integrated with a StreamingFileSink. We are
> > more than happy to contribute if there's no other plan to address this
> > issue.
> >
> > Thanks again.
> >
> > -
> > Bests
> > Ying
> >
> >
> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]>
> wrote:
> >
> > > Hi Ying,
> > >
> > > You are right! If it is either on checkpoint or on size, then this is
> > > doable even with the current state of things.
> > > Could you open a JIRA so that we can keep track of the progress?
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote:
> > >
> > > > HI Kostas:
> > > >
> > > > Thanks for the prompt reply.
> > > >
> > > > The file rolling policy mentioned previously is meant to roll files
> > > EITHER
> > > > when a size limited is reached, OR when a checkpoint happens.  Looks
> > like
> > > > every time a file is rolled, the part file is closed
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> > > > >,
> > > > during which file is closed with a committable returned
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> > > > >.
> > > > I assume it is during closeForCommit() when the Parquet file
> metatdata
> > is
> > > > written.  At a first glance, the code path of file rolling looks very
> > > > similar to that inside prepareBucketForCheckpointing()
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> > > > >.
> > > > Not sure if I miss anything there.
> > > >
> > > >
> > > > -
> > > > Ying
> > > >
> > > >
> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]>
> > > wrote:
> > > >
> > > > > Hi Ying,
> > > > >
> > > > > Thanks for using the StreamingFileSink.
> > > > >
> > > > > The reason why the StreamingFileSink only supports
> > > > > OnCheckpointRollingPolicy with bulk
> > > > > formats has to do with the fact that currently Flink relies on the
> > > Hadoop
> > > > > writer for Parquet.
> > > > >
> > > > > Bulk formats keep important details about how they write the actual
> > > data
> > > > > (such as compression
> > > > > schemes, offsets, etc) in metadata and they write this metadata
> with
> > > the
> > > > > file (e.g. parquet writes
> > > > > them as a footer). The hadoop writer gives no access to these
> > metadata.
> > > > > Given this, there is
> > > > > no way for flink to be able to checkpoint a part file securely
> > without
> > > > > closing it.
> > > > >
> > > > > The solution would be to write our own writer and not go through
> the
> > > > hadoop
> > > > > one, but there
> > > > > are no concrete plans for this, as far as I know.
> > > > >
> > > > > I hope this explains a bit more why the StreamingFileSink has this
> > > > > limitation.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > >
> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]>
> > wrote:
> > > > >
> > > > > > Dear Flink community:
> > > > > >
> > > > > > We have a use case where StreamingFileSink
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > > > > >
> > > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
> > the
> > > > data
> > > > > > sources consist of hybrid types of events, for which each type is
> > > > > uploaded
> > > > > > to an individual s3 prefix location. Because the event size is
> > highly
> > > > > > skewed, the uploaded file size may differ dramatically.  In order
> > to
> > > > > have a
> > > > > > better control over the uploaded file size, we would like to
> adopt
> > a
> > > > > > rolling policy based on file sizes (e.g., roll the file every
> > 100MB).
> > > > Yet
> > > > > > it appears bulk-encoding StreamingFileSink only supports
> > > > checkpoint-based
> > > > > > file rolling.
> > > > > >
> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part
> file
> > on
> > > > > every
> > > > > > checkpoint.
> > > > > >
> > > > > > Checkpoint-based file rolling appears to have other side effects.
> > For
> > > > > > instance, quite a lot of the heavy liftings (e.g file parts
> > > uploading)
> > > > > are
> > > > > > performed at the checkpointing time. As a result, checkpointing
> > takes
> > > > > > longer duration when data volume is high.
> > > > > >
> > > > > > Having a customized file rolling policy can be achieved by small
> > > > > > adjustments on the BulkFormatBuilder interface in
> > StreamingFileSink.
> > > In
> > > > > the
> > > > > > case of using S3RecoverableWriter, file rolling triggers data
> > > uploading
> > > > > and
> > > > > > corresponding S3Committer is also constructed and stored. Hence
> on
> > > the
> > > > > > surface, adding a simple file-size based rolling policy would NOT
> > > > > > compromise the established exact-once guarantee.
> > > > > >
> > > > > > Any advises on whether the above idea makes sense? Or perhaps
> there
> > > are
> > > > > > pitfalls that one might pay attention when introducing such
> rolling
> > > > > policy.
> > > > > > Thanks a lot!
> > > > > >
> > > > > >
> > > > > > -
> > > > > > Ying
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Ying Xu-2
Hi Kostas:

For simplicity FLINK-13027
<https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to my
current user ID. I will contribute using that ID.

Will circulate with the community once we have initial success with this
new rolling policy !

Thank you again.

-
Ying


On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <[hidden email]> wrote:

> Hi Kostas:
>
> I'd like to.  The account used to file the JIRA does not have contributor
> access yet .  I had contributed a few Flink JIRAs in the past, using a very
> similar but different account.  Now I would like to consolidate and use a
> common account for Apache projects contributions.
>
> Would you mind granting me the contributor access for the following
> account ?  This way I can assign the JIRA to myself.
>            *yxu-apache
> <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache>*
>
> Many thanks!
> <http://www.lyft.com/>
> -
> Ying
>
>
> On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]> wrote:
>
>> Hi Ying,
>>
>> That sounds great!
>> Looking forward to your PR!
>>
>> Btw don't you want to assign the issue to yourself if you are
>> planning to work on it?
>>
>> Kostas
>>
>> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote:
>>
>> > Thanks Kostas for confirming!
>> >
>> > I've filed a issue FLINK-13027
>> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
>> > working on the interface of such a file rolling policy, and will also
>> > perform benchmarks when it is integrated with a StreamingFileSink. We
>> are
>> > more than happy to contribute if there's no other plan to address this
>> > issue.
>> >
>> > Thanks again.
>> >
>> > -
>> > Bests
>> > Ying
>> >
>> >
>> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]>
>> wrote:
>> >
>> > > Hi Ying,
>> > >
>> > > You are right! If it is either on checkpoint or on size, then this is
>> > > doable even with the current state of things.
>> > > Could you open a JIRA so that we can keep track of the progress?
>> > >
>> > > Cheers,
>> > > Kostas
>> > >
>> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]> wrote:
>> > >
>> > > > HI Kostas:
>> > > >
>> > > > Thanks for the prompt reply.
>> > > >
>> > > > The file rolling policy mentioned previously is meant to roll files
>> > > EITHER
>> > > > when a size limited is reached, OR when a checkpoint happens.  Looks
>> > like
>> > > > every time a file is rolled, the part file is closed
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
>> > > > >,
>> > > > during which file is closed with a committable returned
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
>> > > > >.
>> > > > I assume it is during closeForCommit() when the Parquet file
>> metatdata
>> > is
>> > > > written.  At a first glance, the code path of file rolling looks
>> very
>> > > > similar to that inside prepareBucketForCheckpointing()
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
>> > > > >.
>> > > > Not sure if I miss anything there.
>> > > >
>> > > >
>> > > > -
>> > > > Ying
>> > > >
>> > > >
>> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <[hidden email]>
>> > > wrote:
>> > > >
>> > > > > Hi Ying,
>> > > > >
>> > > > > Thanks for using the StreamingFileSink.
>> > > > >
>> > > > > The reason why the StreamingFileSink only supports
>> > > > > OnCheckpointRollingPolicy with bulk
>> > > > > formats has to do with the fact that currently Flink relies on the
>> > > Hadoop
>> > > > > writer for Parquet.
>> > > > >
>> > > > > Bulk formats keep important details about how they write the
>> actual
>> > > data
>> > > > > (such as compression
>> > > > > schemes, offsets, etc) in metadata and they write this metadata
>> with
>> > > the
>> > > > > file (e.g. parquet writes
>> > > > > them as a footer). The hadoop writer gives no access to these
>> > metadata.
>> > > > > Given this, there is
>> > > > > no way for flink to be able to checkpoint a part file securely
>> > without
>> > > > > closing it.
>> > > > >
>> > > > > The solution would be to write our own writer and not go through
>> the
>> > > > hadoop
>> > > > > one, but there
>> > > > > are no concrete plans for this, as far as I know.
>> > > > >
>> > > > > I hope this explains a bit more why the StreamingFileSink has this
>> > > > > limitation.
>> > > > >
>> > > > > Cheers,
>> > > > > Kostas
>> > > > >
>> > > > >
>> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]>
>> > wrote:
>> > > > >
>> > > > > > Dear Flink community:
>> > > > > >
>> > > > > > We have a use case where StreamingFileSink
>> > > > > > <
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>> > > > > > >
>> > > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
>> > the
>> > > > data
>> > > > > > sources consist of hybrid types of events, for which each type
>> is
>> > > > > uploaded
>> > > > > > to an individual s3 prefix location. Because the event size is
>> > highly
>> > > > > > skewed, the uploaded file size may differ dramatically.  In
>> order
>> > to
>> > > > > have a
>> > > > > > better control over the uploaded file size, we would like to
>> adopt
>> > a
>> > > > > > rolling policy based on file sizes (e.g., roll the file every
>> > 100MB).
>> > > > Yet
>> > > > > > it appears bulk-encoding StreamingFileSink only supports
>> > > > checkpoint-based
>> > > > > > file rolling.
>> > > > > >
>> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
>> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part
>> file
>> > on
>> > > > > every
>> > > > > > checkpoint.
>> > > > > >
>> > > > > > Checkpoint-based file rolling appears to have other side
>> effects.
>> > For
>> > > > > > instance, quite a lot of the heavy liftings (e.g file parts
>> > > uploading)
>> > > > > are
>> > > > > > performed at the checkpointing time. As a result, checkpointing
>> > takes
>> > > > > > longer duration when data volume is high.
>> > > > > >
>> > > > > > Having a customized file rolling policy can be achieved by small
>> > > > > > adjustments on the BulkFormatBuilder interface in
>> > StreamingFileSink.
>> > > In
>> > > > > the
>> > > > > > case of using S3RecoverableWriter, file rolling triggers data
>> > > uploading
>> > > > > and
>> > > > > > corresponding S3Committer is also constructed and stored. Hence
>> on
>> > > the
>> > > > > > surface, adding a simple file-size based rolling policy would
>> NOT
>> > > > > > compromise the established exact-once guarantee.
>> > > > > >
>> > > > > > Any advises on whether the above idea makes sense? Or perhaps
>> there
>> > > are
>> > > > > > pitfalls that one might pay attention when introducing such
>> rolling
>> > > > > policy.
>> > > > > > Thanks a lot!
>> > > > > >
>> > > > > >
>> > > > > > -
>> > > > > > Ying
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

Kostas Kloudas-4
Thanks Ying!

Looking forward to your contribution.

Kostas

On Wed, Jul 3, 2019 at 6:48 PM Ying Xu <[hidden email]> wrote:

> Hi Kostas:
>
> For simplicity FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to
> my
> current user ID. I will contribute using that ID.
>
> Will circulate with the community once we have initial success with this
> new rolling policy !
>
> Thank you again.
>
> -
> Ying
>
>
> On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <[hidden email]> wrote:
>
> > Hi Kostas:
> >
> > I'd like to.  The account used to file the JIRA does not have contributor
> > access yet .  I had contributed a few Flink JIRAs in the past, using a
> very
> > similar but different account.  Now I would like to consolidate and use a
> > common account for Apache projects contributions.
> >
> > Would you mind granting me the contributor access for the following
> > account ?  This way I can assign the JIRA to myself.
> >            *yxu-apache
> > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
> >*
> >
> > Many thanks!
> > <http://www.lyft.com/>
> > -
> > Ying
> >
> >
> > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <[hidden email]>
> wrote:
> >
> >> Hi Ying,
> >>
> >> That sounds great!
> >> Looking forward to your PR!
> >>
> >> Btw don't you want to assign the issue to yourself if you are
> >> planning to work on it?
> >>
> >> Kostas
> >>
> >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <[hidden email]> wrote:
> >>
> >> > Thanks Kostas for confirming!
> >> >
> >> > I've filed a issue FLINK-13027
> >> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are
> actively
> >> > working on the interface of such a file rolling policy, and will also
> >> > perform benchmarks when it is integrated with a StreamingFileSink. We
> >> are
> >> > more than happy to contribute if there's no other plan to address this
> >> > issue.
> >> >
> >> > Thanks again.
> >> >
> >> > -
> >> > Bests
> >> > Ying
> >> >
> >> >
> >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <[hidden email]>
> >> wrote:
> >> >
> >> > > Hi Ying,
> >> > >
> >> > > You are right! If it is either on checkpoint or on size, then this
> is
> >> > > doable even with the current state of things.
> >> > > Could you open a JIRA so that we can keep track of the progress?
> >> > >
> >> > > Cheers,
> >> > > Kostas
> >> > >
> >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <[hidden email]>
> wrote:
> >> > >
> >> > > > HI Kostas:
> >> > > >
> >> > > > Thanks for the prompt reply.
> >> > > >
> >> > > > The file rolling policy mentioned previously is meant to roll
> files
> >> > > EITHER
> >> > > > when a size limited is reached, OR when a checkpoint happens.
> Looks
> >> > like
> >> > > > every time a file is rolled, the part file is closed
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >> > > > >,
> >> > > > during which file is closed with a committable returned
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >> > > > >.
> >> > > > I assume it is during closeForCommit() when the Parquet file
> >> metatdata
> >> > is
> >> > > > written.  At a first glance, the code path of file rolling looks
> >> very
> >> > > > similar to that inside prepareBucketForCheckpointing()
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> >> > > > >.
> >> > > > Not sure if I miss anything there.
> >> > > >
> >> > > >
> >> > > > -
> >> > > > Ying
> >> > > >
> >> > > >
> >> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <
> [hidden email]>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Ying,
> >> > > > >
> >> > > > > Thanks for using the StreamingFileSink.
> >> > > > >
> >> > > > > The reason why the StreamingFileSink only supports
> >> > > > > OnCheckpointRollingPolicy with bulk
> >> > > > > formats has to do with the fact that currently Flink relies on
> the
> >> > > Hadoop
> >> > > > > writer for Parquet.
> >> > > > >
> >> > > > > Bulk formats keep important details about how they write the
> >> actual
> >> > > data
> >> > > > > (such as compression
> >> > > > > schemes, offsets, etc) in metadata and they write this metadata
> >> with
> >> > > the
> >> > > > > file (e.g. parquet writes
> >> > > > > them as a footer). The hadoop writer gives no access to these
> >> > metadata.
> >> > > > > Given this, there is
> >> > > > > no way for flink to be able to checkpoint a part file securely
> >> > without
> >> > > > > closing it.
> >> > > > >
> >> > > > > The solution would be to write our own writer and not go through
> >> the
> >> > > > hadoop
> >> > > > > one, but there
> >> > > > > are no concrete plans for this, as far as I know.
> >> > > > >
> >> > > > > I hope this explains a bit more why the StreamingFileSink has
> this
> >> > > > > limitation.
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Kostas
> >> > > > >
> >> > > > >
> >> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <[hidden email]>
> >> > wrote:
> >> > > > >
> >> > > > > > Dear Flink community:
> >> > > > > >
> >> > > > > > We have a use case where StreamingFileSink
> >> > > > > > <
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >> > > > > > >
> >> > > > > > is used for persisting bulk-encoded data to AWS s3. In our
> case,
> >> > the
> >> > > > data
> >> > > > > > sources consist of hybrid types of events, for which each type
> >> is
> >> > > > > uploaded
> >> > > > > > to an individual s3 prefix location. Because the event size is
> >> > highly
> >> > > > > > skewed, the uploaded file size may differ dramatically.  In
> >> order
> >> > to
> >> > > > > have a
> >> > > > > > better control over the uploaded file size, we would like to
> >> adopt
> >> > a
> >> > > > > > rolling policy based on file sizes (e.g., roll the file every
> >> > 100MB).
> >> > > > Yet
> >> > > > > > it appears bulk-encoding StreamingFileSink only supports
> >> > > > checkpoint-based
> >> > > > > > file rolling.
> >> > > > > >
> >> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> >> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part
> >> file
> >> > on
> >> > > > > every
> >> > > > > > checkpoint.
> >> > > > > >
> >> > > > > > Checkpoint-based file rolling appears to have other side
> >> effects.
> >> > For
> >> > > > > > instance, quite a lot of the heavy liftings (e.g file parts
> >> > > uploading)
> >> > > > > are
> >> > > > > > performed at the checkpointing time. As a result,
> checkpointing
> >> > takes
> >> > > > > > longer duration when data volume is high.
> >> > > > > >
> >> > > > > > Having a customized file rolling policy can be achieved by
> small
> >> > > > > > adjustments on the BulkFormatBuilder interface in
> >> > StreamingFileSink.
> >> > > In
> >> > > > > the
> >> > > > > > case of using S3RecoverableWriter, file rolling triggers data
> >> > > uploading
> >> > > > > and
> >> > > > > > corresponding S3Committer is also constructed and stored.
> Hence
> >> on
> >> > > the
> >> > > > > > surface, adding a simple file-size based rolling policy would
> >> NOT
> >> > > > > > compromise the established exact-once guarantee.
> >> > > > > >
> >> > > > > > Any advises on whether the above idea makes sense? Or perhaps
> >> there
> >> > > are
> >> > > > > > pitfalls that one might pay attention when introducing such
> >> rolling
> >> > > > > policy.
> >> > > > > > Thanks a lot!
> >> > > > > >
> >> > > > > >
> >> > > > > > -
> >> > > > > > Ying
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>