I have some problem with load balancing and was wondering how to deal with
this kind of problem in Flink. The input I have is a data set of grouped ID's that I join with metadata for each ID. Then I need to compare each Item in a group with each other item in that group and if necessary splitting it into different subgroups. In flink its a join followed by a group reduce. The problem is that the groups differ a lot in size. 90% of the groups are done in 5 minutes while the rest takes 2 hours. In order to get this more efficient I would need to distribute the N to N comparison that currently is done in the group reduce function. Anyone has an idea how I can do that in a simple way? My current Idea is to make the group reduce step emit computation partitions and then do another flat-map step to do the actual computation. Would this solve the problem? cheers Martin |
Hi Martin,
Flink does not have features to mitigate data skew at the moment, such as dynamic partitioning. That would also "only" allow to process large groups as an individual partitions and multiple smaller groups together in other partitions. The issue of having a large group would not be solved with that. This is more on the application-level right now and could for example be solved by adding something like a group-cross operator... I think your approach of emitting multiple smaller partitions from a group-reduce, reshuffle (there is a rebalance operator [1]), and apply a flatmap sounds like a good idea to me. At least, I didn't come up with a better approach ;-) Cheers, Fabian [1] http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations 2014-10-28 20:53 GMT+01:00 Martin Neumann <[hidden email]>: > I have some problem with load balancing and was wondering how to deal with > this kind of problem in Flink. > The input I have is a data set of grouped ID's that I join with metadata > for each ID. Then I need to compare each Item in a group with each other > item in that group and if necessary splitting it into different subgroups. > In flink its a join followed by a group reduce. > > The problem is that the groups differ a lot in size. 90% of the groups are > done in 5 minutes while the rest takes 2 hours. In order to get this more > efficient I would need to distribute the N to N comparison that currently > is done in the group reduce function. Anyone has an idea how I can do that > in a simple way? > > My current Idea is to make the group reduce step emit computation > partitions and then do another flat-map step to do the actual computation. > Would this solve the problem? > > cheers Martin > |
Just had another idea.
The group-wise crossing that you are doing is actually a self-join on the grouping key. The system has currently no special strategy to deal with selfjoins. That means both inputs of the join (which are identical) are treated as two individual inputs. If you force a broadcast of the one side and build a hash partition on the other side, the following would happen: The broadcasted input would be replicate and sent to each individual worker thread. The other input would remain local and be still partitioned and therefore smaller on each node. That's why you would build the hash-table from the partitioned input. The larger, replicated input would be streamed along the hash tables. Because the inputs are not partitioned on the key, there should be no loadbalancing issues (depending on the previous partitioning, it can be even perfectly balanced...) However, this might not work (well) if the input is too large to be replicated a lot (or the smaller partitions are too large for in-memory hash-tables). Best, Fabian 2014-10-30 17:56 GMT+01:00 Fabian Hueske <[hidden email]>: > Hi Martin, > > Flink does not have features to mitigate data skew at the moment, such as > dynamic partitioning. > That would also "only" allow to process large groups as an individual > partitions and multiple smaller groups together in other partitions. > The issue of having a large group would not be solved with that. This is > more on the application-level right now and could for example be solved by > adding something like a group-cross operator... > > I think your approach of emitting multiple smaller partitions from a > group-reduce, reshuffle (there is a rebalance operator [1]), and apply a > flatmap sounds like a good idea to me. > At least, I didn't come up with a better approach ;-) > > Cheers, Fabian > > [1] > http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations > > 2014-10-28 20:53 GMT+01:00 Martin Neumann <[hidden email]>: > >> I have some problem with load balancing and was wondering how to deal with >> this kind of problem in Flink. >> The input I have is a data set of grouped ID's that I join with metadata >> for each ID. Then I need to compare each Item in a group with each other >> item in that group and if necessary splitting it into different subgroups. >> In flink its a join followed by a group reduce. >> >> The problem is that the groups differ a lot in size. 90% of the groups are >> done in 5 minutes while the rest takes 2 hours. In order to get this more >> efficient I would need to distribute the N to N comparison that currently >> is done in the group reduce function. Anyone has an idea how I can do that >> in a simple way? >> >> My current Idea is to make the group reduce step emit computation >> partitions and then do another flat-map step to do the actual computation. >> Would this solve the problem? >> >> cheers Martin >> > > |
Hmm, just found that there is no JoinHint that would allow what I described
above. Broadcasting one input and using the other one to build a hash-tables is usually not a good thing to do, because the broadcasted side should be much smaller than the other one... 2014-10-31 21:56 GMT+01:00 Fabian Hueske <[hidden email]>: > Just had another idea. > The group-wise crossing that you are doing is actually a self-join on the > grouping key. > The system has currently no special strategy to deal with selfjoins. That > means both inputs of the join (which are identical) are treated as two > individual inputs. If you force a broadcast of the one side and build a > hash partition on the other side, the following would happen: > > The broadcasted input would be replicate and sent to each individual > worker thread. The other input would remain local and be still partitioned > and therefore smaller on each node. That's why you would build the > hash-table from the partitioned input. The larger, replicated input would > be streamed along the hash tables. Because the inputs are not partitioned > on the key, there should be no loadbalancing issues (depending on the > previous partitioning, it can be even perfectly balanced...) > > However, this might not work (well) if the input is too large to be > replicated a lot (or the smaller partitions are too large for in-memory > hash-tables). > > Best, Fabian > > 2014-10-30 17:56 GMT+01:00 Fabian Hueske <[hidden email]>: > >> Hi Martin, >> >> Flink does not have features to mitigate data skew at the moment, such as >> dynamic partitioning. >> That would also "only" allow to process large groups as an individual >> partitions and multiple smaller groups together in other partitions. >> The issue of having a large group would not be solved with that. This is >> more on the application-level right now and could for example be solved by >> adding something like a group-cross operator... >> >> I think your approach of emitting multiple smaller partitions from a >> group-reduce, reshuffle (there is a rebalance operator [1]), and apply a >> flatmap sounds like a good idea to me. >> At least, I didn't come up with a better approach ;-) >> >> Cheers, Fabian >> >> [1] >> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations >> >> 2014-10-28 20:53 GMT+01:00 Martin Neumann <[hidden email]>: >> >>> I have some problem with load balancing and was wondering how to deal >>> with >>> this kind of problem in Flink. >>> The input I have is a data set of grouped ID's that I join with metadata >>> for each ID. Then I need to compare each Item in a group with each other >>> item in that group and if necessary splitting it into different >>> subgroups. >>> In flink its a join followed by a group reduce. >>> >>> The problem is that the groups differ a lot in size. 90% of the groups >>> are >>> done in 5 minutes while the rest takes 2 hours. In order to get this more >>> efficient I would need to distribute the N to N comparison that currently >>> is done in the group reduce function. Anyone has an idea how I can do >>> that >>> in a simple way? >>> >>> My current Idea is to make the group reduce step emit computation >>> partitions and then do another flat-map step to do the actual >>> computation. >>> Would this solve the problem? >>> >>> cheers Martin >>> >> >> > |
Free forum by Nabble | Edit this page |