Hello flink-dev, The aim is to achieve data locality for source connectors and for (dynamic for our case) task/operator scaling. Current version (flink 1.8) – some_hash(key_hash) % parallelism – just a round-robin algorithm. Proposal – control on datastream object-key layer its key-group distribution via simple interface implementation, find attached diff (for flinlk 1.8.0) and auxiliary java-interface class. So by virtue of implementation
this interface one can manage key-group assignment. Very flexible and without any significant runtime changing. For us it gives speedup with reading from already groupby* kafka-source (custom topic-partition distribution) – no need to reshuffle, source topic-partition reader chained with next local operator (see user-mail archive
[1] and issue [2]). Also it can help for parallelism (rebalance) changing. Globally, when full task stopped and then restored with different parallelism value. And within task execution, say operator1().parallelism(4).operator2().parallelism(6).
In this scenario current (flink 1.8) realization would move entire all data across all task node (the truth be told, 5/6 data will be moved). But we can save current group assignment (in the key object – the old_parallelism = 4) and we know new value, new_parallelism
= 6. So we can move only part of the keys – 2/6 from each node and entire data (network, disk, memory) traffic would be in 2.5 times smaller, see tables below: Old parallelism = 4, 24 different keys Node|key 0| 00, 04, 08, 12, 16, 20 1| 01, 05, 09, 13, 17, 21 2| 02, 06, 10, 14, 18, 22 3| 03, 07, 11, 15, 19, 23 New parallelism = 6, same keys Node|key 0| 00, 04, 12, 16 1| 01, 05, 13, 17 2| 02, 06, 14, 18 3| 03, 07, 15, 19 4|08, 09, 10, 11 5|20, 21, 22, 23 Take third item from every task node (0..3) and move it to new node4, then take next third item and move it to node5 then next third goes to node4 and so on. The gain is not only less data copied but also (of course depends on realization) less clearing* java-operation (no need to resend all data, save it to some new heap and clear all previous), garbage collection, memory
and also there are other area to tune/improve. In additional, yes, the network is fast, 10GbE or even more, but the truth is a number of virtual layers (OpenStack, VMware, etc) with a finite limited tcp\ip stack, so the real speed and lags are different to declared
(by manufacture). Does this way corresponds with farther flink evolution? Can it be merged into 1.8 or later version? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-partitions-data-locality-td27355.html [2]
https://issues.apache.org/jira/browse/FLINK-12294 Best, Sergey |
Free forum by Nabble | Edit this page |