Will StreamingFileSink.forBulkFormat(...) support overriding OnCheckpointRollingPolicy?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Will StreamingFileSink.forBulkFormat(...) support overriding OnCheckpointRollingPolicy?

Elkhan Dadashov
Hi Flink Dev team,

Will StreamingFileSink.forBulkFormat(...) support overriding
OnCheckpointRollingPolicy?

Does anyone use StreamingFileSink *with checkpoint disabled *for writing
Parquet output files?

The output parquet files are generated, but they are empty, and stay in
*inprogress* state, even when the job completes:

.part-0-0.inprogress.3e31ba42-588c-48cc-ad6d-d0ebcf1d8632
.part-1-0.inprogress.78e1f1dc-3c1c-417b-8270-2bf0298f985a
.part-2-0.inprogress.087cf3f1-7e2d-4a03-a518-62f576ed7eea

Exactly-once semantics is not important for my case, would then using
*BucketingSink* is the only option ?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Will StreamingFileSink.forBulkFormat(...) support overriding OnCheckpointRollingPolicy?

till.rohrmann
Hi,

as the documentation for the StreamingFileSink indicates [1], it is
required to enable checkpoints if you want to use bulk encoded output
formats atm.

I'm not sure whether there are concrete plans to change this behaviour in
the future because it breaks with exactly once processing guarantees. Klou
might know more.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

Cheers,
Till

On Mon, Jul 22, 2019 at 10:21 PM Elkhan Dadashov <[hidden email]>
wrote:

> Hi Flink Dev team,
>
> Will StreamingFileSink.forBulkFormat(...) support overriding
> OnCheckpointRollingPolicy?
>
> Does anyone use StreamingFileSink *with checkpoint disabled *for writing
> Parquet output files?
>
> The output parquet files are generated, but they are empty, and stay in
> *inprogress* state, even when the job completes:
>
> .part-0-0.inprogress.3e31ba42-588c-48cc-ad6d-d0ebcf1d8632
> .part-1-0.inprogress.78e1f1dc-3c1c-417b-8270-2bf0298f985a
> .part-2-0.inprogress.087cf3f1-7e2d-4a03-a518-62f576ed7eea
>
> Exactly-once semantics is not important for my case, would then using
> *BucketingSink* is the only option ?
>
> Thanks.
>
Reply | Threaded
Open this post in threaded view
|

Re: Will StreamingFileSink.forBulkFormat(...) support overriding OnCheckpointRollingPolicy?

Kostas Kloudas-4
Hi Elkhan,

As Till pointed out, the fact that your files remain in-progress is the
expected behaviour as the StreamingFileSink assumes
checkpointing is enabled in order to work. There are no plans to change
this behaviour but an issue that may be relevant
for you is https://issues.apache.org/jira/browse/FLINK-13027, but it still
assumes checkpointing is enabled.

Could you elaborate though on what is the reason for not enabling
checkpointing, even if exactly-once is not a necessity?

Cheers,
Kostas


On Tue, Jul 23, 2019 at 11:58 AM Till Rohrmann <[hidden email]>
wrote:

> Hi,
>
> as the documentation for the StreamingFileSink indicates [1], it is
> required to enable checkpoints if you want to use bulk encoded output
> formats atm.
>
> I'm not sure whether there are concrete plans to change this behaviour in
> the future because it breaks with exactly once processing guarantees. Klou
> might know more.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> Cheers,
> Till
>
> On Mon, Jul 22, 2019 at 10:21 PM Elkhan Dadashov <
> [hidden email]> wrote:
>
>> Hi Flink Dev team,
>>
>> Will StreamingFileSink.forBulkFormat(...) support overriding
>> OnCheckpointRollingPolicy?
>>
>> Does anyone use StreamingFileSink *with checkpoint disabled *for writing
>> Parquet output files?
>>
>> The output parquet files are generated, but they are empty, and stay in
>> *inprogress* state, even when the job completes:
>>
>> .part-0-0.inprogress.3e31ba42-588c-48cc-ad6d-d0ebcf1d8632
>> .part-1-0.inprogress.78e1f1dc-3c1c-417b-8270-2bf0298f985a
>> .part-2-0.inprogress.087cf3f1-7e2d-4a03-a518-62f576ed7eea
>>
>> Exactly-once semantics is not important for my case, would then using
>> *BucketingSink* is the only option ?
>>
>> Thanks.
>>
>