[DISCUSS] Support specifying custom transactional.id prefix in FlinkKafkaProducer

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support specifying custom transactional.id prefix in FlinkKafkaProducer

zetaplusae
Hi everyone,

Currently, the "transactional.id"s of the Kafka producers in
FlinkKafkaProducer are generated based on the task name. This mechanism has
some limitations:

 - It will exceed Kafka's limitation if the task name is too long.
(resolved in FLINK-17691)
 - They will very likely clash with each other if the job topologies are
similar. (discussed in FLINK-11654)
 - Only certain "transactional.id" may be authorized by Prefixed ACLs on
the target Kafka cluster.

Besides, I have also seen that a lot of other open-source Kafka connectors
have already supported specifying a custom prefix during creation. For
instance, the spring community has introduced the `setTransactionIdPrefix`
method to their Kafka client.

So I propose this improvement and hope it could be developed and released
recently.

This is actually a follow-up discussion of FLINK-11654
<https://issues.apache.org/jira/browse/FLINK-11654>. And I have also raised
FLINK-22452 <https://issues.apache.org/jira/browse/FLINK-22452> to track
this issue.

As discussed, here are the possible solutions,
- either introduce an additional method called
`setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i
prefer)
- or use the existing "transactional.id" properties as the prefix.

And the behavior of the "transactional.id" generation will be
 - keep the behavior as it was if absent,
 - use the one if present as the prefix for the TransactionalIdsGenerator.

As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this.
I would love to work on this and create the FLIP. Can somebody help me
(Username: zetaplusae
<https://cwiki.apache.org/confluence/display/~zetaplusae>) grant the
permissions on Confluence and also assign the ticket to me?

Thanks,

Wenhao
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support specifying custom transactional.id prefix in FlinkKafkaProducer

Till Rohrmann
Thanks for starting this discussion Wenhao. I've given you permission to
create a FLIP.

Cheers,
Till

On Sat, Jun 5, 2021 at 9:48 AM Wenhao Ji <[hidden email]> wrote:

> Hi everyone,
>
> Currently, the "transactional.id"s of the Kafka producers in
> FlinkKafkaProducer are generated based on the task name. This mechanism has
> some limitations:
>
>  - It will exceed Kafka's limitation if the task name is too long.
> (resolved in FLINK-17691)
>  - They will very likely clash with each other if the job topologies are
> similar. (discussed in FLINK-11654)
>  - Only certain "transactional.id" may be authorized by Prefixed ACLs on
> the target Kafka cluster.
>
> Besides, I have also seen that a lot of other open-source Kafka connectors
> have already supported specifying a custom prefix during creation. For
> instance, the spring community has introduced the `setTransactionIdPrefix`
> method to their Kafka client.
>
> So I propose this improvement and hope it could be developed and released
> recently.
>
> This is actually a follow-up discussion of FLINK-11654
> <https://issues.apache.org/jira/browse/FLINK-11654>. And I have also
> raised
> FLINK-22452 <https://issues.apache.org/jira/browse/FLINK-22452> to track
> this issue.
>
> As discussed, here are the possible solutions,
> - either introduce an additional method called
> `setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i
> prefer)
> - or use the existing "transactional.id" properties as the prefix.
>
> And the behavior of the "transactional.id" generation will be
>  - keep the behavior as it was if absent,
>  - use the one if present as the prefix for the TransactionalIdsGenerator.
>
> As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this.
> I would love to work on this and create the FLIP. Can somebody help me
> (Username: zetaplusae
> <https://cwiki.apache.org/confluence/display/~zetaplusae>) grant the
> permissions on Confluence and also assign the ticket to me?
>
> Thanks,
>
> Wenhao
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support specifying custom transactional.id prefix in FlinkKafkaProducer

zetaplusae
Thanks Till!

I've opened the FLIP-172 and a new discussion thread of which the title
follows the pattern as suggested on the FLIP main page.
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-172-Support-custom-transactional-id-prefix-in-FlinkKafkaProducer-td51355.html

Let's discuss this feature in that thread and hope you could share your
ideas and suggestions about it.

Thanks,
Wenhao

On Mon, Jun 7, 2021 at 5:01 PM Till Rohrmann <[hidden email]> wrote:

> Thanks for starting this discussion Wenhao. I've given you permission to
> create a FLIP.
>
> Cheers,
> Till
>
> On Sat, Jun 5, 2021 at 9:48 AM Wenhao Ji <[hidden email]> wrote:
>
> > Hi everyone,
> >
> > Currently, the "transactional.id"s of the Kafka producers in
> > FlinkKafkaProducer are generated based on the task name. This mechanism
> has
> > some limitations:
> >
> >  - It will exceed Kafka's limitation if the task name is too long.
> > (resolved in FLINK-17691)
> >  - They will very likely clash with each other if the job topologies are
> > similar. (discussed in FLINK-11654)
> >  - Only certain "transactional.id" may be authorized by Prefixed ACLs on
> > the target Kafka cluster.
> >
> > Besides, I have also seen that a lot of other open-source Kafka
> connectors
> > have already supported specifying a custom prefix during creation. For
> > instance, the spring community has introduced the
> `setTransactionIdPrefix`
> > method to their Kafka client.
> >
> > So I propose this improvement and hope it could be developed and released
> > recently.
> >
> > This is actually a follow-up discussion of FLINK-11654
> > <https://issues.apache.org/jira/browse/FLINK-11654>. And I have also
> > raised
> > FLINK-22452 <https://issues.apache.org/jira/browse/FLINK-22452> to track
> > this issue.
> >
> > As discussed, here are the possible solutions,
> > - either introduce an additional method called
> > `setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i
> > prefer)
> > - or use the existing "transactional.id" properties as the prefix.
> >
> > And the behavior of the "transactional.id" generation will be
> >  - keep the behavior as it was if absent,
> >  - use the one if present as the prefix for the
> TransactionalIdsGenerator.
> >
> > As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this.
> > I would love to work on this and create the FLIP. Can somebody help me
> > (Username: zetaplusae
> > <https://cwiki.apache.org/confluence/display/~zetaplusae>) grant the
> > permissions on Confluence and also assign the ticket to me?
> >
> > Thanks,
> >
> > Wenhao
> >
>