Hi Zhijang, thank you for your replay. I was playing around a little in the last days and ended up in a solution where I change the ResultPartitionView's subpartitionIndex as soon as it returns an EndOfPartition Event. This way I can, sequentially, receive multiple subpartitions at one single InputChannel. Now receiving the subpartitions sequentially has a very poor performance. I would like to receive them concurrently. As far a I found out, we have to divide between data send over LocalInputChannels and RemoteInputChannels (via Netty). For remote transportation I figured out that the PartitionRequestQueue on Sender side has to be the place to tweak. So first of all I suppressed the processing of EndOfPartition Events at the SingleInputGate and triggered it all togheter in the end. Then I tried to build a new Reader within PartitionRequestQueue and request a subpartition by using an existing reader: public void addParallelReader(NetworkSequenceViewReader oldReader, int newIndex){ NetworkSequenceViewReader newReader = new SequenceNumberingViewReader(oldReader.getReceiverId(), this); //send request newReader.requestSubpartitionView( oldReader.getPartitionProvider(), oldReader.getResultPartitionId(), newIndex); //notify and add notifyReaderCreated(newReader); registerAvailableReader(newReader); availableReaders.add(newReader); } But unfortunately, the data was not received... The second place where a new subpartitionIndex has to be requested is in the LocalInputChannel. Since the ResultSubpartitionViews are not wrapped in Readers here, I wanted to handle multiple ResultSubpartitionViews here. But it seemed to me that the LocalInputChannel is too firmly interwined with the View. Do you have an idea for an other approach or would you say I'm on the right track? Thank you. Chris On 2019/01/10 04:11:56, "zhijiang" <[hidden email]> wrote: > Hi Chris,> > > I think your requirement seems like this:> > 1. Determine the number of logic output partitions on upstream side.> > 2. Determine the number of logic input channels on downstream side.> > 3. Determine which input channel consumes corresponding output partition.> > > I remembered Tez has similar mechanism. In flink the number of partitions/channels and mapping relationship are determined by parallelism and ship/partitioner strategy during graph generation. So Currently I think it has no way to change from this aspect. But it might realize your requirement from another point in flink. You can change the data distribution in subpartitions. That means the previous data in sp1 can always be emitted into sp0, so the worker0 which consumes sp0 can get data from both. Considering implementation, you might need implement a custom partitioner which controls the data distribution in all subpartitions as you required.> > > Best,> > Zhijiang> > ------------------------------------------------------------------> > From:Chris Miller <[hidden email]>> > Send Time:2019年1月9日(星期三) 20:46> > To:dev <[hidden email]>> > Subject:Request multiple subpartitions of one partition> > > Hello, > > > let's image we do a hash join of two DataSources. For the join operation> > we choose parallelism=5. > > > This way Flink uses 5 TaskManagerRunners to do the join job. In> > particular, the DataSource tasks, EACH ARE CREATING 5 SUBPARTITIONS.> > Every worker, now requests ONE SUBPARTITION from both DataSources. (eg.> > worker 0 requests subpartition 0, worker 1 requests subpartition 1, ...> > worker 4 requests subpartition 4) > > > _For the case that I'm wrong until here - please correct me._ > > > Now - for my special Usecase - I would like worker 0 to not only request> > subpartition 0 BUT ALSO REQUEST SUBPARTITION 1. (sure, I also have to> > stop worker 1 requesting subpartition 1)> > The problem is, that I cannot just trigger requestSubpartition() in the> > InputChannel again with another index, because the channel itself has to> > be created first. > > > Can anyone help me finding the best position to do the changes? > > > Thanks. > > > Chris > [1] Gruß Benjamin Links: ------ [1] file:///Applications/Spark.app/Contents/Resources/smx-composer.bundle/smx-plain-composer.html# |
Hi Chris,
I might understand your way, but it seems more complciated and tricky/hacky. You want to mock the partition request on sender side, that means one partition request would consume multiple subpartitions in the case of local/remote input channel. But this way is not easy to work, and it involves in many changes both in sender and reciver sides, most important it even destroys many existing mechanisms. I do not suggest adjusting the network stack for this special case personally. The proper and lightweight way might be implementing a custom ChannelSelector as I mentioned before, which would not change any exiting mechanisms in runtime and it is easy to control. Best, Zhijiang ------------------------------------------------------------------ From:Chris Miller <[hidden email]> Send Time:2019年1月24日(星期四) 03:04 To:wangzhijiang999 <[hidden email]> Cc:dev <[hidden email]> Subject:Re: Request multiple subpartitions of one partition Hi Zhijang, thank you for your replay. I was playing around a little in the last days and ended up in a solution where I change the ResultPartitionView's subpartitionIndex as soon as it returns an EndOfPartition Event. This way I can, sequentially, receive multiple subpartitions at one single InputChannel. Now receiving the subpartitions sequentially has a very poor performance. I would like to receive them concurrently. As far a I found out, we have to divide between data send over LocalInputChannels and RemoteInputChannels (via Netty). For remote transportation I figured out that the PartitionRequestQueue on Sender side has to be the place to tweak. So first of all I suppressed the processing of EndOfPartition Events at the SingleInputGate and triggered it all togheter in the end. Then I tried to build a new Reader within PartitionRequestQueue and request a subpartition by using an existing reader: public void addParallelReader(NetworkSequenceViewReader oldReader, int newIndex){ NetworkSequenceViewReader newReader = new SequenceNumberingViewReader(oldReader.getReceiverId(), this); //send request newReader.requestSubpartitionView( oldReader.getPartitionProvider(), oldReader.getResultPartitionId(), newIndex); //notify and add notifyReaderCreated(newReader); registerAvailableReader(newReader); availableReaders.add(newReader); } But unfortunately, the data was not received... The second place where a new subpartitionIndex has to be requested is in the LocalInputChannel. Since the ResultSubpartitionViews are not wrapped in Readers here, I wanted to handle multiple ResultSubpartitionViews here. But it seemed to me that the LocalInputChannel is too firmly interwined with the View. Do you have an idea for an other approach or would you say I'm on the right track? Thank you. Chris On 2019/01/10 04:11:56, "zhijiang" <[hidden email]> wrote: Hi Chris,> I think your requirement seems like this:> 1. Determine the number of logic output partitions on upstream side.> 2. Determine the number of logic input channels on downstream side.> 3. Determine which input channel consumes corresponding output partition.> I remembered Tez has similar mechanism. In flink the number of partitions/channels and mapping relationship are determined by parallelism and ship/partitioner strategy during graph generation. So Currently I think it has no way to change from this aspect. But it might realize your requirement from another point in flink. You can change the data distribution in subpartitions. That means the previous data in sp1 can always be emitted into sp0, so the worker0 which consumes sp0 can get data from both. Considering implementation, you might need implement a custom partitioner which controls the data distribution in all subpartitions as you required.> Best,> Zhijiang> ------------------------------------------------------------------> From:Chris Miller <[hidden email]>> Send Time:2019年1月9日(星期三) 20:46> To:dev <[hidden email]>> Subject:Request multiple subpartitions of one partition> Hello, > let's image we do a hash join of two DataSources. For the join operation> we choose parallelism=5. > This way Flink uses 5 TaskManagerRunners to do the join job. In> particular, the DataSource tasks, EACH ARE CREATING 5 SUBPARTITIONS.> Every worker, now requests ONE SUBPARTITION from both DataSources. (eg.> worker 0 requests subpartition 0, worker 1 requests subpartition 1, ...> worker 4 requests subpartition 4) > _For the case that I'm wrong until here - please correct me._ > Now - for my special Usecase - I would like worker 0 to not only request> subpartition 0 BUT ALSO REQUEST SUBPARTITION 1. (sure, I also have to> stop worker 1 requesting subpartition 1)> The problem is, that I cannot just trigger requestSubpartition() in the> InputChannel again with another index, because the channel itself has to> be created first. > Can anyone help me finding the best position to do the changes? > Thanks. > Chris > Gruß Benjamin |
Free forum by Nabble | Edit this page |