Hi,
A while ago we had found that if you construct a Kafka Producer that it always uses the FlinkFixedPartitioner to spread the data across the Kafka partitions. Except when you give it a custom partitioner. Because we want all our elements to be partitioned by the key of the records we created this issue and put up a pull request with a simple FlinkKeyHashPartitioner. https://issues.apache.org/jira/browse/FLINK-9610 A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does this by default already, why this change?" So I did a lot deeper digging to understand how the partitioning decisions and data flows from the Flink API down into the Kafka producer client code. My conclusions: 1) The Kafka producer code uses the provided partitioner, if it doesn't have that it uses the hash of the key, if it doesn't have a key then it does a round robin distribution. 2) The Flink Kafka producer constructors are available in the variants with and without a partitioner. Even if you provide a valid key for each record it will still use the FlinkFixedPartitioner if no explicit partitioner has been specified. Looking at the code (I haven't tried it) you can actually get the desired behavior without any code changes by using the constructor that requires a partitioner and there give it a null value. Yuck! In my opinion providing a KeyedSerializationSchema is an implicit way of specifying that you want to use that key to partition the data by. So to make this a workable situation I see three ways to handle this: 1) We merge something like the partitioner I proposed. 2) We change the constructors that get a KeyedSerializationSchema to use that key for partitioning. 3) We remove all constructors that have a KeyedSerializationSchema because the key is never used anyway. I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward compatibility is the best solution. So to clarify the change I propose here: We change the behavior of the all flink producer constructors that have a KeyedSerializationSchema parameter and NO partitioner. The proposed change is that because we HAVE a key and we do NOT have a partitioner the partitioning is done by the partitioning code that already exists in the underlying Kafka. So for the rest of the constructors the behavior remains unchanged: - With a NON-Keyed SerializationSchema - With a provided partitioner What do you guys think? -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi Niels,
Your conclusions are accurate, and I also agree with the fact that the combination of the KeyedSerializationSchema / providing partitioners, etc. is all a bit awkward as of the current state. As for the proposed solutions, I personally disagree with 1), since key partitioning, IMO, should be the default behavior. There were actually already discussions in making that happen once a Kafka connector remake happens in the future. And yes, 2) seems to be the best solution here. To round up solution 2), we have: - A constructor that takes a KeyedSerializationSchema, and NO partitioner. This implicitly uses Kafka's key partitioning. - A constructor that takes a SerializationSchema, and Optional<FlinkKafkaPartitioner>. By default, the `FlinkFixedPartitioner` is used. If None is provided, then we use round-robin partitioning. Though, this would be breaking because default partitioning behaviours for the KeyedSerializationSchema variant would change. I would vote against introducing a breaking change now, since key partitioning is still achievable right now (although admittedly in a very non-friendly way). Instead, we only incorporate these ideas when the Kafka connector remake happens. There has already been thoughts in doing this, triggered by many other aspects (reworking Flink's source interface, having a common abstraction for efficient partition discovery / idleness detection in partition-based replayable sources, etc. ) Overall, I think that this discussion also brings up another aspect of the `KeyedSerializationSchema` - it bundles too many concerns within a single interface. 1. It defines the serialization. 2. It extracts the partitioning key for each record (though it may never be used), due to custom partitioning. This might have been better off with a separate `KafkaKeyExtractor`, for example. 3. It decides the target topic for each record, which may be more suitable in the `FlinkKafkaPartitioner` interface. Cheers, Gordon On Sun, Aug 19, 2018 at 4:44 PM Niels Basjes <[hidden email]> wrote: > Hi, > > A while ago we had found that if you construct a Kafka Producer that it > always uses the FlinkFixedPartitioner to spread the data across the Kafka > partitions. > Except when you give it a custom partitioner. > > Because we want all our elements to be partitioned by the key of the > records we created this issue and put up a pull request with a > simple FlinkKeyHashPartitioner. > > https://issues.apache.org/jira/browse/FLINK-9610 > > A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does > this by default already, why this change?" > > So I did a lot deeper digging to understand how the partitioning decisions > and data flows from the Flink API down into the Kafka producer client code. > > My conclusions: > 1) The Kafka producer code uses the provided partitioner, if it doesn't > have that it uses the hash of the key, if it doesn't have a key then it > does a round robin distribution. > 2) The Flink Kafka producer constructors are available in the variants with > and without a partitioner. Even if you provide a valid key for each record > it will still use the FlinkFixedPartitioner if no explicit partitioner has > been specified. > > Looking at the code (I haven't tried it) you can actually get the desired > behavior without any code changes by using the constructor that requires a > partitioner and there give it a null value. > Yuck! > > In my opinion providing a KeyedSerializationSchema is an implicit way of > specifying that you want to use that key to partition the data by. > > So to make this a workable situation I see three ways to handle this: > 1) We merge something like the partitioner I proposed. > 2) We change the constructors that get a KeyedSerializationSchema to use > that key for partitioning. > 3) We remove all constructors that have a KeyedSerializationSchema because > the key is never used anyway. > > I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward > compatibility is the best solution. > > So to clarify the change I propose here: > We change the behavior of the all flink producer constructors that have > a KeyedSerializationSchema parameter and NO partitioner. > The proposed change is that because we HAVE a key and we do NOT have a > partitioner the partitioning is done by the partitioning code that already > exists in the underlying Kafka. > > So for the rest of the constructors the behavior remains unchanged: > - With a NON-Keyed SerializationSchema > - With a provided partitioner > > What do you guys think? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > |
Free forum by Nabble | Edit this page |