Add partitionedKeyBy to DataStream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Add partitionedKeyBy to DataStream

Xiaowei Jiang
After we do any interesting operations (e.g. reduce) on KeyedStream, the
result becomes DataStream. In a lot of cases, the output still has the same
or compatible keys with the KeyedStream (logically). But to do further
operations on these keys, we are forced to use keyby again. This works
semantically, but is costly in two aspects. First, it destroys the
possibility of chaining, which is one of the most important optimization
technique. Second, keyby will greatly expand the connected components of
tasks, which has implications in failover optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
    public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
taskid as an extra field. This guarantees that records from different tasks
will never produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
    .partitionedKeyBy(key1).reduce(func2)
    .partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a
single vertex.

Please share your thoughts. The JIRA is at https://issues.apache.org/j
ira/browse/FLINK-4855

Xiaowei
Reply | Threaded
Open this post in threaded view
|

Re: Add partitionedKeyBy to DataStream

Till Rohrmann
Hi Xiaowei,

I like the idea to reuse a partitioning and thus saving a shuffle
operation. It would be great if we could fail at runtime in case the
partitioning changed somehow. That way a logical user failure won't go
unnoticed.

Would it make sense to name the method partitionedByKey(...) because the
data is already partitioned?

Cheers,
Till

On Thu, Oct 20, 2016 at 9:53 AM, Xiaowei Jiang <[hidden email]> wrote:

> After we do any interesting operations (e.g. reduce) on KeyedStream, the
> result becomes DataStream. In a lot of cases, the output still has the same
> or compatible keys with the KeyedStream (logically). But to do further
> operations on these keys, we are forced to use keyby again. This works
> semantically, but is costly in two aspects. First, it destroys the
> possibility of chaining, which is one of the most important optimization
> technique. Second, keyby will greatly expand the connected components of
> tasks, which has implications in failover optimization.
>
> To address this shortcoming, we propose a new operator partitionedKeyBy.
>
> DataStream {
>     public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
>
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
> taskid as an extra field. This guarantees that records from different tasks
> will never produce the same keys.
>
> With this, it's possible to do
>
> ds.keyBy(key1).reduce(func1)
>     .partitionedKeyBy(key1).reduce(func2)
>     .partitionedKeyBy(key2).reduce(func3);
>
> Most importantly, in certain cases, we will be able to chains these into a
> single vertex.
>
> Please share your thoughts. The JIRA is at https://issues.apache.org/j
> ira/browse/FLINK-4855
>
> Xiaowei
>