Possible bug in Kafka producer partitioning logic

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible bug in Kafka producer partitioning logic

Gyula Fóra
Hi all,

We had some problems with custom partitioning for the 0.8 Kafka producer
and now that I checked the code it seems there might be a problem with the
logic.

The producer determines the number of partitions in the open method and
seems to be using that as a value passed to the custom partitioner for
producing the records.
This will however only work if the defaultTopicId (topic) has the same
number of partitions as all other topics in the kafka cluster when
producing to multiple topics.

In our case the default topic had 16 and new ones have 3 as default so it
gives an out of range partition error.

Is my understanding correct or am I overlooking something?

Thank you!
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Tzu-Li (Gordon) Tai
Hi Gyula,

Yes, I think the semantics of the Partitioner interface is a bit off.
The `numPartitions` value ideally should be the number of partitions of the `targetTopic`.

Here’s a JIRA I just filed to track the issue: https://issues.apache.org/jira/browse/FLINK-6288.

Cheers,
Gordon

On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:

Hi all,  

We had some problems with custom partitioning for the 0.8 Kafka producer  
and now that I checked the code it seems there might be a problem with the  
logic.  

The producer determines the number of partitions in the open method and  
seems to be using that as a value passed to the custom partitioner for  
producing the records.  
This will however only work if the defaultTopicId (topic) has the same  
number of partitions as all other topics in the kafka cluster when  
producing to multiple topics.  

In our case the default topic had 16 and new ones have 3 as default so it  
gives an out of range partition error.  

Is my understanding correct or am I overlooking something?  

Thank you!  
Gyula  
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Gyula Fóra
Thanks for checking this out.

I would say this is definitely a blocking issue for the bugfix release,
what do you think?

Gyula

Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
10., H, 15:39):

Hi Gyula,

Yes, I think the semantics of the Partitioner interface is a bit off.
The `numPartitions` value ideally should be the number of partitions of the
`targetTopic`.

Here’s a JIRA I just filed to track the issue:
https://issues.apache.org/jira/browse/FLINK-6288.

Cheers,
Gordon

On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:

Hi all,

We had some problems with custom partitioning for the 0.8 Kafka producer
and now that I checked the code it seems there might be a problem with the
logic.

The producer determines the number of partitions in the open method and
seems to be using that as a value passed to the custom partitioner for
producing the records.
This will however only work if the defaultTopicId (topic) has the same
number of partitions as all other topics in the kafka cluster when
producing to multiple topics.

In our case the default topic had 16 and new ones have 3 as default so it
gives an out of range partition error.

Is my understanding correct or am I overlooking something?

Thank you!
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Tzu-Li (Gordon) Tai
I would prefer to make this a blocker for a future bugfix actually, and not 1.2.1.

The reason is that to fix this properly we might need to look again into (and possibly change) how partitioners are provided.
The main problem is that the `open` method can only possibly be called once with the partitions of one topic.
So, we might need the user to provide multiple partitioners, one for each of all the possible topics that will be written to.

One way or another, my gut feeling is that this would need somewhat slight change to the Kafka producer APIs.
And I’m not so sure of rushing API changes into releases.


On April 10, 2017 at 6:46:29 AM, Gyula Fóra ([hidden email]) wrote:

Thanks for checking this out.  

I would say this is definitely a blocking issue for the bugfix release,  
what do you think?  

Gyula  

Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.  
10., H, 15:39):  

Hi Gyula,  

Yes, I think the semantics of the Partitioner interface is a bit off.  
The `numPartitions` value ideally should be the number of partitions of the  
`targetTopic`.  

Here’s a JIRA I just filed to track the issue:  
https://issues.apache.org/jira/browse/FLINK-6288.  

Cheers,  
Gordon  

On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:  

Hi all,  

We had some problems with custom partitioning for the 0.8 Kafka producer  
and now that I checked the code it seems there might be a problem with the  
logic.  

The producer determines the number of partitions in the open method and  
seems to be using that as a value passed to the custom partitioner for  
producing the records.  
This will however only work if the defaultTopicId (topic) has the same  
number of partitions as all other topics in the kafka cluster when  
producing to multiple topics.  

In our case the default topic had 16 and new ones have 3 as default so it  
gives an out of range partition error.  

Is my understanding correct or am I overlooking something?  

Thank you!  
Gyula  
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Gyula Fóra
I understand the reasoning, on the other hand this creates a problem that
is very hard to work around. :/

Do you have any suggestions how to get around this?

Gyula

Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
10., H, 15:57):

> I would prefer to make this a blocker for a future bugfix actually, and
> not 1.2.1.
>
> The reason is that to fix this properly we might need to look again into
> (and possibly change) how partitioners are provided.
> The main problem is that the `open` method can only possibly be called
> once with the partitions of one topic.
> So, we might need the user to provide multiple partitioners, one for each
> of all the possible topics that will be written to.
>
> One way or another, my gut feeling is that this would need somewhat slight
> change to the Kafka producer APIs.
> And I’m not so sure of rushing API changes into releases.
>
>
> On April 10, 2017 at 6:46:29 AM, Gyula Fóra ([hidden email]) wrote:
>
> Thanks for checking this out.
>
> I would say this is definitely a blocking issue for the bugfix release,
> what do you think?
>
> Gyula
>
> Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
> 10., H, 15:39):
>
> Hi Gyula,
>
> Yes, I think the semantics of the Partitioner interface is a bit off.
> The `numPartitions` value ideally should be the number of partitions of the
> `targetTopic`.
>
> Here’s a JIRA I just filed to track the issue:
> https://issues.apache.org/jira/browse/FLINK-6288.
>
> Cheers,
> Gordon
>
> On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:
>
> Hi all,
>
> We had some problems with custom partitioning for the 0.8 Kafka producer
> and now that I checked the code it seems there might be a problem with the
> logic.
>
> The producer determines the number of partitions in the open method and
> seems to be using that as a value passed to the custom partitioner for
> producing the records.
> This will however only work if the defaultTopicId (topic) has the same
> number of partitions as all other topics in the kafka cluster when
> producing to multiple topics.
>
> In our case the default topic had 16 and new ones have 3 as default so it
> gives an out of range partition error.
>
> Is my understanding correct or am I overlooking something?
>
> Thank you!
> Gyula
>
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Gyula Fóra
In the worst case scenario we will have a custom build that will just cache
the different partition numbers in a map. (But still call partitioner.open
only once)
I think this simple intermediate fix would actually be good enough for most
people who get blocked by this in the short run.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. ápr. 10., H,
16:01):

> I understand the reasoning, on the other hand this creates a problem that
> is very hard to work around. :/
>
> Do you have any suggestions how to get around this?
>
> Gyula
>
> Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
> 10., H, 15:57):
>
> I would prefer to make this a blocker for a future bugfix actually, and
> not 1.2.1.
>
> The reason is that to fix this properly we might need to look again into
> (and possibly change) how partitioners are provided.
> The main problem is that the `open` method can only possibly be called
> once with the partitions of one topic.
> So, we might need the user to provide multiple partitioners, one for each
> of all the possible topics that will be written to.
>
> One way or another, my gut feeling is that this would need somewhat slight
> change to the Kafka producer APIs.
> And I’m not so sure of rushing API changes into releases.
>
>
> On April 10, 2017 at 6:46:29 AM, Gyula Fóra ([hidden email]) wrote:
>
> Thanks for checking this out.
>
> I would say this is definitely a blocking issue for the bugfix release,
> what do you think?
>
> Gyula
>
> Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
> 10., H, 15:39):
>
> Hi Gyula,
>
> Yes, I think the semantics of the Partitioner interface is a bit off.
> The `numPartitions` value ideally should be the number of partitions of the
> `targetTopic`.
>
> Here’s a JIRA I just filed to track the issue:
> https://issues.apache.org/jira/browse/FLINK-6288.
>
> Cheers,
> Gordon
>
> On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:
>
> Hi all,
>
> We had some problems with custom partitioning for the 0.8 Kafka producer
> and now that I checked the code it seems there might be a problem with the
> logic.
>
> The producer determines the number of partitions in the open method and
> seems to be using that as a value passed to the custom partitioner for
> producing the records.
> This will however only work if the defaultTopicId (topic) has the same
> number of partitions as all other topics in the kafka cluster when
> producing to multiple topics.
>
> In our case the default topic had 16 and new ones have 3 as default so it
> gives an out of range partition error.
>
> Is my understanding correct or am I overlooking something?
>
> Thank you!
> Gyula
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Tzu-Li (Gordon) Tai
That workaround should work, yes.
The proper fix would also be something similar I guess, only just exposing extra APIs to properly provide different partitioners for different topics.

Btw, sorry for the slow responses, as I’m also currently traveling for the Flink Forward conference in San Francisco.

Cheers,
Gordon

On April 10, 2017 at 7:05:10 AM, Gyula Fóra ([hidden email]) wrote:

In the worst case scenario we will have a custom build that will just cache  
the different partition numbers in a map. (But still call partitioner.open  
only once)  
I think this simple intermediate fix would actually be good enough for most  
people who get blocked by this in the short run.  

Gyula  

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. ápr. 10., H,  
16:01):  

> I understand the reasoning, on the other hand this creates a problem that  
> is very hard to work around. :/  
>  
> Do you have any suggestions how to get around this?  
>  
> Gyula  
>  
> Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.  
> 10., H, 15:57):  
>  
> I would prefer to make this a blocker for a future bugfix actually, and  
> not 1.2.1.  
>  
> The reason is that to fix this properly we might need to look again into  
> (and possibly change) how partitioners are provided.  
> The main problem is that the `open` method can only possibly be called  
> once with the partitions of one topic.  
> So, we might need the user to provide multiple partitioners, one for each  
> of all the possible topics that will be written to.  
>  
> One way or another, my gut feeling is that this would need somewhat slight  
> change to the Kafka producer APIs.  
> And I’m not so sure of rushing API changes into releases.  
>  
>  
> On April 10, 2017 at 6:46:29 AM, Gyula Fóra ([hidden email]) wrote:  
>  
> Thanks for checking this out.  
>  
> I would say this is definitely a blocking issue for the bugfix release,  
> what do you think?  
>  
> Gyula  
>  
> Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.  
> 10., H, 15:39):  
>  
> Hi Gyula,  
>  
> Yes, I think the semantics of the Partitioner interface is a bit off.  
> The `numPartitions` value ideally should be the number of partitions of the  
> `targetTopic`.  
>  
> Here’s a JIRA I just filed to track the issue:  
> https://issues.apache.org/jira/browse/FLINK-6288.  
>  
> Cheers,  
> Gordon  
>  
> On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email]) wrote:  
>  
> Hi all,  
>  
> We had some problems with custom partitioning for the 0.8 Kafka producer  
> and now that I checked the code it seems there might be a problem with the  
> logic.  
>  
> The producer determines the number of partitions in the open method and  
> seems to be using that as a value passed to the custom partitioner for  
> producing the records.  
> This will however only work if the defaultTopicId (topic) has the same  
> number of partitions as all other topics in the kafka cluster when  
> producing to multiple topics.  
>  
> In our case the default topic had 16 and new ones have 3 as default so it  
> gives an out of range partition error.  
>  
> Is my understanding correct or am I overlooking something?  
>  
> Thank you!  
> Gyula  
>  
>  
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in Kafka producer partitioning logic

Gyula Fóra
No worries, we have already pushed the fix for our jobs on a custom build :)

Have a safe trip!
Gyula

Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
11., K, 23:11):

> That workaround should work, yes.
> The proper fix would also be something similar I guess, only just exposing
> extra APIs to properly provide different partitioners for different topics.
>
> Btw, sorry for the slow responses, as I’m also currently traveling for the
> Flink Forward conference in San Francisco.
>
> Cheers,
> Gordon
>
> On April 10, 2017 at 7:05:10 AM, Gyula Fóra ([hidden email]) wrote:
>
> In the worst case scenario we will have a custom build that will just cache
> the different partition numbers in a map. (But still call partitioner.open
> only once)
> I think this simple intermediate fix would actually be good enough for most
> people who get blocked by this in the short run.
>
> Gyula
>
> Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. ápr. 10., H,
> 16:01):
>
> > I understand the reasoning, on the other hand this creates a problem that
> > is very hard to work around. :/
> >
> > Do you have any suggestions how to get around this?
> >
> > Gyula
> >
> > Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
> > 10., H, 15:57):
> >
> > I would prefer to make this a blocker for a future bugfix actually, and
> > not 1.2.1.
> >
> > The reason is that to fix this properly we might need to look again into
> > (and possibly change) how partitioners are provided.
> > The main problem is that the `open` method can only possibly be called
> > once with the partitions of one topic.
> > So, we might need the user to provide multiple partitioners, one for each
> > of all the possible topics that will be written to.
> >
> > One way or another, my gut feeling is that this would need somewhat
> slight
> > change to the Kafka producer APIs.
> > And I’m not so sure of rushing API changes into releases.
> >
> >
> > On April 10, 2017 at 6:46:29 AM, Gyula Fóra ([hidden email])
> wrote:
> >
> > Thanks for checking this out.
> >
> > I would say this is definitely a blocking issue for the bugfix release,
> > what do you think?
> >
> > Gyula
> >
> > Tzu-Li (Gordon) Tai <[hidden email]> ezt írta (időpont: 2017. ápr.
> > 10., H, 15:39):
> >
> > Hi Gyula,
> >
> > Yes, I think the semantics of the Partitioner interface is a bit off.
> > The `numPartitions` value ideally should be the number of partitions of
> the
> > `targetTopic`.
> >
> > Here’s a JIRA I just filed to track the issue:
> > https://issues.apache.org/jira/browse/FLINK-6288.
> >
> > Cheers,
> > Gordon
> >
> > On April 10, 2017 at 1:16:18 AM, Gyula Fóra ([hidden email])
> wrote:
> >
> > Hi all,
> >
> > We had some problems with custom partitioning for the 0.8 Kafka producer
> > and now that I checked the code it seems there might be a problem with
> the
> > logic.
> >
> > The producer determines the number of partitions in the open method and
> > seems to be using that as a value passed to the custom partitioner for
> > producing the records.
> > This will however only work if the defaultTopicId (topic) has the same
> > number of partitions as all other topics in the kafka cluster when
> > producing to multiple topics.
> >
> > In our case the default topic had 16 and new ones have 3 as default so it
> > gives an out of range partition error.
> >
> > Is my understanding correct or am I overlooking something?
> >
> > Thank you!
> > Gyula
> >
> >
>