Request multiple subpartitions of one partition

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Request multiple subpartitions of one partition

Chris Miller
 

Hello,

let's image we do a hash join of two DataSources. For the join operation
we choose parallelism=5.

This way Flink uses 5 TaskManagerRunners to do the join job. In
particular, the DataSource tasks, EACH ARE CREATING 5 SUBPARTITIONS.
Every worker, now requests ONE SUBPARTITION from both DataSources. (eg.
worker 0 requests subpartition 0, worker 1 requests subpartition 1, ...
worker 4 requests subpartition 4)

_For the case that I'm wrong until here - please correct me._

Now - for my special Usecase - I would like worker 0 to not only request
subpartition 0 BUT ALSO REQUEST SUBPARTITION 1. (sure, I also have to
stop worker 1 requesting subpartition 1)
The problem is, that I cannot just trigger requestSubpartition() in the
InputChannel again with another index, because the channel itself has to
be created first.

Can anyone help me finding the best position to do the changes?

Thanks.

Chris
 
Reply | Threaded
Open this post in threaded view
|

Re: Request multiple subpartitions of one partition

Zhijiang(wangzhijiang999)
Hi Chris,

I think your requirement seems like this:
1. Determine the number of logic output partitions on upstream side.
2. Determine the number of logic input channels on downstream side.
3. Determine which input channel consumes corresponding output partition.

I remembered Tez has similar mechanism. In flink the number of partitions/channels and mapping relationship are determined by parallelism and ship/partitioner strategy during graph generation. So Currently I think it has no way to change from this aspect. But it might realize your requirement from another point in flink. You can change the data distribution in subpartitions. That means the previous data in sp1 can always be emitted into sp0, so the worker0 which consumes sp0 can get data from both.  Considering implementation, you might need implement a custom partitioner which controls the data distribution in all subpartitions as you required.

Best,
Zhijiang
------------------------------------------------------------------
From:Chris Miller <[hidden email]>
Send Time:2019年1月9日(星期三) 20:46
To:dev <[hidden email]>
Subject:Request multiple subpartitions of one partition



Hello,

let's image we do a hash join of two DataSources. For the join operation
we choose parallelism=5.

This way Flink uses 5 TaskManagerRunners to do the join job. In
particular, the DataSource tasks, EACH ARE CREATING 5 SUBPARTITIONS.
Every worker, now requests ONE SUBPARTITION from both DataSources. (eg.
worker 0 requests subpartition 0, worker 1 requests subpartition 1, ...
worker 4 requests subpartition 4)

_For the case that I'm wrong until here - please correct me._

Now - for my special Usecase - I would like worker 0 to not only request
subpartition 0 BUT ALSO REQUEST SUBPARTITION 1. (sure, I also have to
stop worker 1 requesting subpartition 1)
The problem is, that I cannot just trigger requestSubpartition() in the
InputChannel again with another index, because the channel itself has to
be created first.

Can anyone help me finding the best position to do the changes?

Thanks.

Chris