Changing parallelism on BucketingSink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Changing parallelism on BucketingSink

Felix Cheung
Hi,

I'm implementing a custom sink. The job is reading a DataStream into this custom sink.
I'd like to be able to maximize the parallelism to use all available slots in the cluster, but to
write to a smaller sets of files in the final output.

When I implement this sink with DataStream.writeAsText, I get a DataStreamSink which has the setParallelism() method.
However, when I implement using BucketingSink, to leverage the ability to bucket to paths and limit file sizes, it seems there is no available option to change the parallelism.
It seems this isn't available either in AbstractRichFunction, RichSinkFunction, or SinkFunction?

It seems the only way is to change the default parallelism on the "current" ExecutionEnvironment, before calling addSink on the DataStream?

Any suggestion would be appreciated!

Reply | Threaded
Open this post in threaded view
|

Re: Changing parallelism on BucketingSink

Till Rohrmann
Hi Felix,

when calling dataStream.addSink(new BucketingSink()) it returns a
DataStreamSink object. On this object you can set the parallelism via
setParallelism.

Cheers,
Till


On Mon, Aug 21, 2017 at 7:04 PM, Felix Cheung <[hidden email]>
wrote:

> Hi,
>
> I'm implementing a custom sink. The job is reading a DataStream into this
> custom sink.
> I'd like to be able to maximize the parallelism to use all available slots
> in the cluster, but to
> write to a smaller sets of files in the final output.
>
> When I implement this sink with DataStream.writeAsText, I get a
> DataStreamSink which has the setParallelism() method.
> However, when I implement using BucketingSink, to leverage the ability to
> bucket to paths and limit file sizes, it seems there is no available option
> to change the parallelism.
> It seems this isn't available either in AbstractRichFunction,
> RichSinkFunction, or SinkFunction?
>
> It seems the only way is to change the default parallelism on the
> "current" ExecutionEnvironment, before calling addSink on the DataStream?
>
> Any suggestion would be appreciated!
>
>