Hello,
I am facing a problem where KeyedStream is purely parallelised on workers for case where number of keys is close to parallelism. Some workers process zero keys, some more than one. This is because of `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in `KeyGroupStreamPartitioner` as I found out in this post: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html I would like to find out what are my options here. * is there a reason why custom partitioner can not be used in keyed stream? * can there be an API support for creating keys correct KeyedStream compatible keys? It would also make `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain scenarios. * any other option I have? Many thanks in advance. Best, Jozef |
FWIW, if you want exactly one record per operator, then this code <https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> should generate key values that will be partitioned properly.
— Ken > On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> wrote: > > Hello, > > I am facing a problem where KeyedStream is purely parallelised on workers > for case where number of keys is close to parallelism. > > Some workers process zero keys, some more than one. This is because of > `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in > `KeyGroupStreamPartitioner` as I found out in this post: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html > > I would like to find out what are my options here. > * is there a reason why custom partitioner can not be used in keyed stream? > * can there be an API support for creating keys correct KeyedStream > compatible keys? It would also make > `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain > scenarios. > * any other option I have? > > Many thanks in advance. > > Best, > Jozef -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Thanks Ken. Yes, similar approach is suggested in post I shared in my
question. But to me it feels a bit hack-ish. I would like to know if this is only solution with Flink or do I miss something? Can there be more API-ish support for such use-case from Flink? Is there a reason why there is none? Or is there? On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <[hidden email]> wrote: > FWIW, if you want exactly one record per operator, then this code < > https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> > should generate key values that will be partitioned properly. > > — Ken > > > On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> wrote: > > > > Hello, > > > > I am facing a problem where KeyedStream is purely parallelised on workers > > for case where number of keys is close to parallelism. > > > > Some workers process zero keys, some more than one. This is because of > > `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in > > `KeyGroupStreamPartitioner` as I found out in this post: > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html > > > > I would like to find out what are my options here. > > * is there a reason why custom partitioner can not be used in keyed > stream? > > * can there be an API support for creating keys correct KeyedStream > > compatible keys? It would also make > > `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain > > scenarios. > > * any other option I have? > > > > Many thanks in advance. > > > > Best, > > Jozef > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
Hi Jozef,
Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t common, from what I’ve seen. Another possible option is to broadcast all records, and then in each operator decide what records to process, based on the operator index and the key value. Something like this in your operator's open() method: public void open(Configuration parameters) throws Exception { super.open(parameters); this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask(); this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); } And in your operator’s processing method... int hash = calcPositiveHashCode(key); if ((hash % this.operatorIndex) == this.numOperators) { … } — Ken > On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <[hidden email]> wrote: > > Thanks Ken. Yes, similar approach is suggested in post I shared in my > question. But to me it feels a bit hack-ish. > I would like to know if this is only solution with Flink or do I miss > something? > Can there be more API-ish support for such use-case from Flink? Is there a > reason why there is none? Or is there? > > On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <[hidden email]> > wrote: > >> FWIW, if you want exactly one record per operator, then this code < >> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> >> should generate key values that will be partitioned properly. >> >> — Ken >> >>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> wrote: >>> >>> Hello, >>> >>> I am facing a problem where KeyedStream is purely parallelised on workers >>> for case where number of keys is close to parallelism. >>> >>> Some workers process zero keys, some more than one. This is because of >>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in >>> `KeyGroupStreamPartitioner` as I found out in this post: >>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html >>> >>> I would like to find out what are my options here. >>> * is there a reason why custom partitioner can not be used in keyed >> stream? >>> * can there be an API support for creating keys correct KeyedStream >>> compatible keys? It would also make >>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain >>> scenarios. >>> * any other option I have? >>> >>> Many thanks in advance. >>> >>> Best, >>> Jozef >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
What you could try is using a KeySelector that maps the input records to
a range of 0-N. This should effectively behave like partitionCustom, except that you get a keyedStream back. On 02.01.2019 22:13, Ken Krugler wrote: > Hi Jozef, > > Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t common, from what I’ve seen. > > Another possible option is to broadcast all records, and then in each operator decide what records to process, based on the operator index and the key value. > > Something like this in your operator's open() method: > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask(); > this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); > } > > And in your operator’s processing method... > > int hash = calcPositiveHashCode(key); > if ((hash % this.operatorIndex) == this.numOperators) { … } > > — Ken > >> On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <[hidden email]> wrote: >> >> Thanks Ken. Yes, similar approach is suggested in post I shared in my >> question. But to me it feels a bit hack-ish. >> I would like to know if this is only solution with Flink or do I miss >> something? >> Can there be more API-ish support for such use-case from Flink? Is there a >> reason why there is none? Or is there? >> >> On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <[hidden email]> >> wrote: >> >>> FWIW, if you want exactly one record per operator, then this code < >>> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> >>> should generate key values that will be partitioned properly. >>> >>> — Ken >>> >>>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> wrote: >>>> >>>> Hello, >>>> >>>> I am facing a problem where KeyedStream is purely parallelised on workers >>>> for case where number of keys is close to parallelism. >>>> >>>> Some workers process zero keys, some more than one. This is because of >>>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in >>>> `KeyGroupStreamPartitioner` as I found out in this post: >>>> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html >>>> I would like to find out what are my options here. >>>> * is there a reason why custom partitioner can not be used in keyed >>> stream? >>>> * can there be an API support for creating keys correct KeyedStream >>>> compatible keys? It would also make >>>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain >>>> scenarios. >>>> * any other option I have? >>>> >>>> Many thanks in advance. >>>> >>>> Best, >>>> Jozef >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>> > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
In reply to this post by Ken Krugler
So, the context here is that I am running an Apache Beam application on
Flink and Keyed stream is used e.g. to provide sharding for writing to filesystem. E.g. each worker is owning one or two shards and writing GBK windowed values into files. So any custom IO which needs too shape parallelism could need this, e.g. interact / query external services. On Wed, Jan 2, 2019 at 10:13 PM Ken Krugler <[hidden email]> wrote: > Hi Jozef, > > Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t > common, from what I’ve seen. > > Another possible option is to broadcast all records, and then in each > operator decide what records to process, based on the operator index and > the key value. > > Something like this in your operator's open() method: > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask(); > this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); > } > > And in your operator’s processing method... > > int hash = calcPositiveHashCode(key); > if ((hash % this.operatorIndex) == this.numOperators) { … } > > — Ken > > > On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <[hidden email]> wrote: > > > > Thanks Ken. Yes, similar approach is suggested in post I shared in my > > question. But to me it feels a bit hack-ish. > > I would like to know if this is only solution with Flink or do I miss > > something? > > Can there be more API-ish support for such use-case from Flink? Is there > a > > reason why there is none? Or is there? > > > > On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <[hidden email]> > > wrote: > > > >> FWIW, if you want exactly one record per operator, then this code < > >> > https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153 > > > >> should generate key values that will be partitioned properly. > >> > >> — Ken > >> > >>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> > wrote: > >>> > >>> Hello, > >>> > >>> I am facing a problem where KeyedStream is purely parallelised on > workers > >>> for case where number of keys is close to parallelism. > >>> > >>> Some workers process zero keys, some more than one. This is because of > >>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in > >>> `KeyGroupStreamPartitioner` as I found out in this post: > >>> > >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html > >>> > >>> I would like to find out what are my options here. > >>> * is there a reason why custom partitioner can not be used in keyed > >> stream? > >>> * can there be an API support for creating keys correct KeyedStream > >>> compatible keys? It would also make > >>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain > >>> scenarios. > >>> * any other option I have? > >>> > >>> Many thanks in advance. > >>> > >>> Best, > >>> Jozef > >> > >> -------------------------- > >> Ken Krugler > >> +1 530-210-6378 > >> http://www.scaleunlimited.com > >> Custom big data solutions & training > >> Flink, Solr, Hadoop, Cascading & Cassandra > >> > >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
In reply to this post by Chesnay Schepler-3
Hi Chesnay, I do not think so this will work. There will be
KeyGroupStreamPartitioner involved, which calls key selector and then involve KeyGroupRangeAssignment https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L60 KeyGroupRangeAssignment do murmurHash(key.hashCode()) On Thu, Jan 3, 2019 at 11:39 AM Chesnay Schepler <[hidden email]> wrote: > What you could try is using a KeySelector that maps the input records to > a range of 0-N. This should effectively behave like partitionCustom, > except that you get a keyedStream back. > > On 02.01.2019 22:13, Ken Krugler wrote: > > Hi Jozef, > > > > Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t > common, from what I’ve seen. > > > > Another possible option is to broadcast all records, and then in each > operator decide what records to process, based on the operator index and > the key value. > > > > Something like this in your operator's open() method: > > > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > this.operatorIndex = > getRuntimeContext().getIndexOfThisSubtask(); > > this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); > > } > > > > And in your operator’s processing method... > > > > int hash = calcPositiveHashCode(key); > > if ((hash % this.operatorIndex) == this.numOperators) { … } > > > > — Ken > > > >> On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <[hidden email]> > wrote: > >> > >> Thanks Ken. Yes, similar approach is suggested in post I shared in my > >> question. But to me it feels a bit hack-ish. > >> I would like to know if this is only solution with Flink or do I miss > >> something? > >> Can there be more API-ish support for such use-case from Flink? Is > there a > >> reason why there is none? Or is there? > >> > >> On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <[hidden email] > > > >> wrote: > >> > >>> FWIW, if you want exactly one record per operator, then this code < > >>> > https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153 > > > >>> should generate key values that will be partitioned properly. > >>> > >>> — Ken > >>> > >>>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <[hidden email]> > wrote: > >>>> > >>>> Hello, > >>>> > >>>> I am facing a problem where KeyedStream is purely parallelised on > workers > >>>> for case where number of keys is close to parallelism. > >>>> > >>>> Some workers process zero keys, some more than one. This is because of > >>>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in > >>>> `KeyGroupStreamPartitioner` as I found out in this post: > >>>> > >>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html > >>>> I would like to find out what are my options here. > >>>> * is there a reason why custom partitioner can not be used in keyed > >>> stream? > >>>> * can there be an API support for creating keys correct KeyedStream > >>>> compatible keys? It would also make > >>>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain > >>>> scenarios. > >>>> * any other option I have? > >>>> > >>>> Many thanks in advance. > >>>> > >>>> Best, > >>>> Jozef > >>> -------------------------- > >>> Ken Krugler > >>> +1 530-210-6378 > >>> http://www.scaleunlimited.com > >>> Custom big data solutions & training > >>> Flink, Solr, Hadoop, Cascading & Cassandra > >>> > >>> > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > Custom big data solutions & training > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > |
Free forum by Nabble | Edit this page |