[DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Cranmer, Danny
Hello everyone,
This is a discussion thread for the FLIP [1] regarding Enhanced Fan Out for AWS Kinesis Consumers.

Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost [2]. EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers
[2] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
I look forward to your feedback,
Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

hey_wxl
Hi, Cranmer.
   
   I'm Roland Wang. I've read the FLIP you wrote, and agree with your
design.
   Recently, I'm working on this feature too, and have made some progress:

   1. I add two methods: getOrRegisterConsumer & subscribeToShard on
KinesisProxyInterface.
   2. I re-implement the KinesisProxy using AWS SDK V2.x.
   3. I use the new KinesisProxy to implement ShardConsumer.

   Though my design is not fully considered, I hope we can discuss a little
bit about this feature. I wish to make some contribution to the community.
 



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Tzu-Li (Gordon) Tai
Hi Cranmer,

Thank you for proposing the feature and starting the discussion thread.
This is really great work!

Overall, +1 to adding EFO support to the Kinesis connector.
I can see that having a dedicated throughput quota for each consuming Flink
application is definitely a requirement for AWS users.
In the past, we worked around this by using adaptive polling to avoid
exceeding the quotas with multiple consumers, this would probably go away
with this implemented.

There are a few things I like about the current proposal:
- EFO is an opt-in feature for now. Once we decide to reimplement the
Kinesis connector on top of the new source interface (FLIP-27), we can
probably consider enabling EFO by default to match the defaults of the
higher-level KCL library.
- From the design, it seems like the changes can indeed be fairly
consolidated in the Kinesis connector. The change should be fairly safe as
well, since we're essentially abstracting record publishing concerns only,
which is transparent to the exactly-once semantics / watermark components.

Concerning competing stream consumer de-/registration:
this would most likely go away with the new source interface, where this
can be done on the source split enumerator.
I'm personally okay with the proposed strategy of competing with backoff.
As food for thought, have you considered an opt-in / opt-out, where the
user knows that the client has access to KDS, and can choose to register
once only on the client side instead of competing in TMs?
I'm not sure if this is worth the extra configuration complexity though.

For the concrete next steps for implementation after this FLIP has passed:
From the FLIP I can conclude that implementation wise, this would come in a
few steps -
1. Abstract away record publishing behind a new interface. Initially we
only have one implementation (the current polling mechanism).
2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
implementation.
3. Add FanOutRecordPublisher to finalize EFO support.

I think step 2. wasn't explicitly mentioned in the FLIP, but I strongly
suggest to consolidate that step as a single PR, as we would need to do a
license check for dependency changes and it would be nice to move forward
with that with at least interference of code changes as possible.

Pushing this FLIP forward for approval:
Since this FLIP is a fairly consolidated change, we should be safe to
proceed with a vote soon.
That usually happens in a separate vote thread, linking to this discussion
thread.

cc'ing Thomas Weise as well, as he has also worked substantially on the
Kinesis connector in the past.

Cheers,
Gordon

On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <[hidden email]> wrote:

> Hi, Cranmer.
>
>    I'm Roland Wang. I've read the FLIP you wrote, and agree with your
> design.
>    Recently, I'm working on this feature too, and have made some progress:
>
>    1. I add two methods: getOrRegisterConsumer & subscribeToShard on
> KinesisProxyInterface.
>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
>    3. I use the new KinesisProxy to implement ShardConsumer.
>
>    Though my design is not fully considered, I hope we can discuss a little
> bit about this feature. I wish to make some contribution to the community.
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Tzu-Li (Gordon) Tai
Also, if it wasn't clear, I'll be happy to provide committer support on
reviewing and merging this FLIP, if it gets approved :)

On Mon, Jun 29, 2020 at 12:12 PM Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Cranmer,
>
> Thank you for proposing the feature and starting the discussion thread.
> This is really great work!
>
> Overall, +1 to adding EFO support to the Kinesis connector.
> I can see that having a dedicated throughput quota for each consuming
> Flink application is definitely a requirement for AWS users.
> In the past, we worked around this by using adaptive polling to avoid
> exceeding the quotas with multiple consumers, this would probably go away
> with this implemented.
>
> There are a few things I like about the current proposal:
> - EFO is an opt-in feature for now. Once we decide to reimplement the
> Kinesis connector on top of the new source interface (FLIP-27), we can
> probably consider enabling EFO by default to match the defaults of the
> higher-level KCL library.
> - From the design, it seems like the changes can indeed be fairly
> consolidated in the Kinesis connector. The change should be fairly safe as
> well, since we're essentially abstracting record publishing concerns only,
> which is transparent to the exactly-once semantics / watermark components.
>
> Concerning competing stream consumer de-/registration:
> this would most likely go away with the new source interface, where this
> can be done on the source split enumerator.
> I'm personally okay with the proposed strategy of competing with backoff.
> As food for thought, have you considered an opt-in / opt-out, where the
> user knows that the client has access to KDS, and can choose to register
> once only on the client side instead of competing in TMs?
> I'm not sure if this is worth the extra configuration complexity though.
>
> For the concrete next steps for implementation after this FLIP has passed:
> From the FLIP I can conclude that implementation wise, this would come in
> a few steps -
> 1. Abstract away record publishing behind a new interface. Initially we
> only have one implementation (the current polling mechanism).
> 2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
> implementation.
> 3. Add FanOutRecordPublisher to finalize EFO support.
>
> I think step 2. wasn't explicitly mentioned in the FLIP, but I strongly
> suggest to consolidate that step as a single PR, as we would need to do a
> license check for dependency changes and it would be nice to move forward
> with that with at least interference of code changes as possible.
>
> Pushing this FLIP forward for approval:
> Since this FLIP is a fairly consolidated change, we should be safe to
> proceed with a vote soon.
> That usually happens in a separate vote thread, linking to this discussion
> thread.
>
> cc'ing Thomas Weise as well, as he has also worked substantially on the
> Kinesis connector in the past.
>
> Cheers,
> Gordon
>
> On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <[hidden email]>
> wrote:
>
>> Hi, Cranmer.
>>
>>    I'm Roland Wang. I've read the FLIP you wrote, and agree with your
>> design.
>>    Recently, I'm working on this feature too, and have made some progress:
>>
>>    1. I add two methods: getOrRegisterConsumer & subscribeToShard on
>> KinesisProxyInterface.
>>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
>>    3. I use the new KinesisProxy to implement ShardConsumer.
>>
>>    Though my design is not fully considered, I hope we can discuss a
>> little
>> bit about this feature. I wish to make some contribution to the community.
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Cranmer, Danny
Hey Gordon,

Thank-you for you review and feedback.

I agree with your suggestion for the contribution plan. I have updated the FLIP to include the additional step with the FanOutKinesisProxy/AWS SDK 2.x dependency. I have also added another precursor step to generally improve test coverage. I have been playing around in the connector code and discovered that restarting consumption from an aggregated record is not covered by unit/integration tests. I have written some additional tests with simulated Kinesis behaviour, pushing these tests in advance would increase confidence in the next contribution step.

Regarding the consumer de-/registration. I had not considered making it optional via configuration as you propose. I agree with your observation on the additional configuration complexity and it would also expose internal implementation details to the user. That being said, if a user application has a very high parallelism they could quite easily exceed the ListStreamConsumers quota (5 TPS [1]), and increase the application start-up time substantially with back-off delays. The client could conditionally (based on config) register the stream consumer and add the ConsumerARN(s) to the consumer properties, eliminating the required parallel calls to ListStreamConsumers by the Flink tasks. I will update the FLIP to include this change, and reply to this thread once it is done.

[1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html

Thanks,
Danny,

On 29/06/2020, 05:19, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Also, if it wasn't clear, I'll be happy to provide committer support on
    reviewing and merging this FLIP, if it gets approved :)

    On Mon, Jun 29, 2020 at 12:12 PM Tzu-Li (Gordon) Tai <[hidden email]>
    wrote:

    > Hi Cranmer,
    >
    > Thank you for proposing the feature and starting the discussion thread.
    > This is really great work!
    >
    > Overall, +1 to adding EFO support to the Kinesis connector.
    > I can see that having a dedicated throughput quota for each consuming
    > Flink application is definitely a requirement for AWS users.
    > In the past, we worked around this by using adaptive polling to avoid
    > exceeding the quotas with multiple consumers, this would probably go away
    > with this implemented.
    >
    > There are a few things I like about the current proposal:
    > - EFO is an opt-in feature for now. Once we decide to reimplement the
    > Kinesis connector on top of the new source interface (FLIP-27), we can
    > probably consider enabling EFO by default to match the defaults of the
    > higher-level KCL library.
    > - From the design, it seems like the changes can indeed be fairly
    > consolidated in the Kinesis connector. The change should be fairly safe as
    > well, since we're essentially abstracting record publishing concerns only,
    > which is transparent to the exactly-once semantics / watermark components.
    >
    > Concerning competing stream consumer de-/registration:
    > this would most likely go away with the new source interface, where this
    > can be done on the source split enumerator.
    > I'm personally okay with the proposed strategy of competing with backoff.
    > As food for thought, have you considered an opt-in / opt-out, where the
    > user knows that the client has access to KDS, and can choose to register
    > once only on the client side instead of competing in TMs?
    > I'm not sure if this is worth the extra configuration complexity though.
    >
    > For the concrete next steps for implementation after this FLIP has passed:
    > From the FLIP I can conclude that implementation wise, this would come in
    > a few steps -
    > 1. Abstract away record publishing behind a new interface. Initially we
    > only have one implementation (the current polling mechanism).
    > 2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
    > implementation.
    > 3. Add FanOutRecordPublisher to finalize EFO support.
    >
    > I think step 2. wasn't explicitly mentioned in the FLIP, but I strongly
    > suggest to consolidate that step as a single PR, as we would need to do a
    > license check for dependency changes and it would be nice to move forward
    > with that with at least interference of code changes as possible.
    >
    > Pushing this FLIP forward for approval:
    > Since this FLIP is a fairly consolidated change, we should be safe to
    > proceed with a vote soon.
    > That usually happens in a separate vote thread, linking to this discussion
    > thread.
    >
    > cc'ing Thomas Weise as well, as he has also worked substantially on the
    > Kinesis connector in the past.
    >
    > Cheers,
    > Gordon
    >
    > On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <[hidden email]>
    > wrote:
    >
    >> Hi, Cranmer.
    >>
    >>    I'm Roland Wang. I've read the FLIP you wrote, and agree with your
    >> design.
    >>    Recently, I'm working on this feature too, and have made some progress:
    >>
    >>    1. I add two methods: getOrRegisterConsumer & subscribeToShard on
    >> KinesisProxyInterface.
    >>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
    >>    3. I use the new KinesisProxy to implement ShardConsumer.
    >>
    >>    Though my design is not fully considered, I hope we can discuss a
    >> little
    >> bit about this feature. I wish to make some contribution to the community.
    >>
    >>
    >>
    >>
    >> --
    >> Sent from:
    >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
    >>
    >

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Aljoscha Krettek-2
Wow, that is one thorough FLIP! I didn't fully go into all the technical
details but I think the general direction of this is good. If no one
objects I'd say we can proceed to voting and figure out the technical
details during implementation/review (if any remain unclear).

Best,
Aljoscha

On 29.06.20 16:27, Cranmer, Danny wrote:

> Hey Gordon,
>
> Thank-you for you review and feedback.
>
> I agree with your suggestion for the contribution plan. I have updated the FLIP to include the additional step with the FanOutKinesisProxy/AWS SDK 2.x dependency. I have also added another precursor step to generally improve test coverage. I have been playing around in the connector code and discovered that restarting consumption from an aggregated record is not covered by unit/integration tests. I have written some additional tests with simulated Kinesis behaviour, pushing these tests in advance would increase confidence in the next contribution step.
>
> Regarding the consumer de-/registration. I had not considered making it optional via configuration as you propose. I agree with your observation on the additional configuration complexity and it would also expose internal implementation details to the user. That being said, if a user application has a very high parallelism they could quite easily exceed the ListStreamConsumers quota (5 TPS [1]), and increase the application start-up time substantially with back-off delays. The client could conditionally (based on config) register the stream consumer and add the ConsumerARN(s) to the consumer properties, eliminating the required parallel calls to ListStreamConsumers by the Flink tasks. I will update the FLIP to include this change, and reply to this thread once it is done.
>
> [1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
>
> Thanks,
> Danny,
>
> On 29/06/2020, 05:19, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Also, if it wasn't clear, I'll be happy to provide committer support on
>      reviewing and merging this FLIP, if it gets approved :)
>
>      On Mon, Jun 29, 2020 at 12:12 PM Tzu-Li (Gordon) Tai <[hidden email]>
>      wrote:
>
>      > Hi Cranmer,
>      >
>      > Thank you for proposing the feature and starting the discussion thread.
>      > This is really great work!
>      >
>      > Overall, +1 to adding EFO support to the Kinesis connector.
>      > I can see that having a dedicated throughput quota for each consuming
>      > Flink application is definitely a requirement for AWS users.
>      > In the past, we worked around this by using adaptive polling to avoid
>      > exceeding the quotas with multiple consumers, this would probably go away
>      > with this implemented.
>      >
>      > There are a few things I like about the current proposal:
>      > - EFO is an opt-in feature for now. Once we decide to reimplement the
>      > Kinesis connector on top of the new source interface (FLIP-27), we can
>      > probably consider enabling EFO by default to match the defaults of the
>      > higher-level KCL library.
>      > - From the design, it seems like the changes can indeed be fairly
>      > consolidated in the Kinesis connector. The change should be fairly safe as
>      > well, since we're essentially abstracting record publishing concerns only,
>      > which is transparent to the exactly-once semantics / watermark components.
>      >
>      > Concerning competing stream consumer de-/registration:
>      > this would most likely go away with the new source interface, where this
>      > can be done on the source split enumerator.
>      > I'm personally okay with the proposed strategy of competing with backoff.
>      > As food for thought, have you considered an opt-in / opt-out, where the
>      > user knows that the client has access to KDS, and can choose to register
>      > once only on the client side instead of competing in TMs?
>      > I'm not sure if this is worth the extra configuration complexity though.
>      >
>      > For the concrete next steps for implementation after this FLIP has passed:
>      > From the FLIP I can conclude that implementation wise, this would come in
>      > a few steps -
>      > 1. Abstract away record publishing behind a new interface. Initially we
>      > only have one implementation (the current polling mechanism).
>      > 2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
>      > implementation.
>      > 3. Add FanOutRecordPublisher to finalize EFO support.
>      >
>      > I think step 2. wasn't explicitly mentioned in the FLIP, but I strongly
>      > suggest to consolidate that step as a single PR, as we would need to do a
>      > license check for dependency changes and it would be nice to move forward
>      > with that with at least interference of code changes as possible.
>      >
>      > Pushing this FLIP forward for approval:
>      > Since this FLIP is a fairly consolidated change, we should be safe to
>      > proceed with a vote soon.
>      > That usually happens in a separate vote thread, linking to this discussion
>      > thread.
>      >
>      > cc'ing Thomas Weise as well, as he has also worked substantially on the
>      > Kinesis connector in the past.
>      >
>      > Cheers,
>      > Gordon
>      >
>      > On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <[hidden email]>
>      > wrote:
>      >
>      >> Hi, Cranmer.
>      >>
>      >>    I'm Roland Wang. I've read the FLIP you wrote, and agree with your
>      >> design.
>      >>    Recently, I'm working on this feature too, and have made some progress:
>      >>
>      >>    1. I add two methods: getOrRegisterConsumer & subscribeToShard on
>      >> KinesisProxyInterface.
>      >>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
>      >>    3. I use the new KinesisProxy to implement ShardConsumer.
>      >>
>      >>    Though my design is not fully considered, I hope we can discuss a
>      >> little
>      >> bit about this feature. I wish to make some contribution to the community.
>      >>
>      >>
>      >>
>      >>
>      >> --
>      >> Sent from:
>      >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>      >>
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Cranmer, Danny
In reply to this post by Cranmer, Danny
Hey Gordon,

I have updated the FLIP [1] to include support for configurable registration strategies:
- Added 2 additional configuration keys
- Added Registration/De-registration Configuration section
- Updated Stream Consumer Registration/Tear Down section
- Remove rejected alternative (since we are now optionally supporting it)

A slight tweak. I have added a third option, "none". This will disable registration/de-registration and allow the user to directly pass in the ConsumerARN. This option adds overhead/complexity to the user configuration but it will remove all start-up and teardown AWS SDK calls.

Let me know if you are happy to proceed to a vote or have any further feedback.

Thanks,
Danny Cranmer

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers

On 29/06/2020, 15:28, "Cranmer, Danny" <[hidden email]> wrote:

    Hey Gordon,

    Thank-you for you review and feedback.

    I agree with your suggestion for the contribution plan. I have updated the FLIP to include the additional step with the FanOutKinesisProxy/AWS SDK 2.x dependency. I have also added another precursor step to generally improve test coverage. I have been playing around in the connector code and discovered that restarting consumption from an aggregated record is not covered by unit/integration tests. I have written some additional tests with simulated Kinesis behaviour, pushing these tests in advance would increase confidence in the next contribution step.

    Regarding the consumer de-/registration. I had not considered making it optional via configuration as you propose. I agree with your observation on the additional configuration complexity and it would also expose internal implementation details to the user. That being said, if a user application has a very high parallelism they could quite easily exceed the ListStreamConsumers quota (5 TPS [1]), and increase the application start-up time substantially with back-off delays. The client could conditionally (based on config) register the stream consumer and add the ConsumerARN(s) to the consumer properties, eliminating the required parallel calls to ListStreamConsumers by the Flink tasks. I will update the FLIP to include this change, and reply to this thread once it is done.

    [1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html

    Thanks,
    Danny,

    On 29/06/2020, 05:19, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:

        CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



        Also, if it wasn't clear, I'll be happy to provide committer support on
        reviewing and merging this FLIP, if it gets approved :)

        On Mon, Jun 29, 2020 at 12:12 PM Tzu-Li (Gordon) Tai <[hidden email]>
        wrote:

        > Hi Cranmer,
        >
        > Thank you for proposing the feature and starting the discussion thread.
        > This is really great work!
        >
        > Overall, +1 to adding EFO support to the Kinesis connector.
        > I can see that having a dedicated throughput quota for each consuming
        > Flink application is definitely a requirement for AWS users.
        > In the past, we worked around this by using adaptive polling to avoid
        > exceeding the quotas with multiple consumers, this would probably go away
        > with this implemented.
        >
        > There are a few things I like about the current proposal:
        > - EFO is an opt-in feature for now. Once we decide to reimplement the
        > Kinesis connector on top of the new source interface (FLIP-27), we can
        > probably consider enabling EFO by default to match the defaults of the
        > higher-level KCL library.
        > - From the design, it seems like the changes can indeed be fairly
        > consolidated in the Kinesis connector. The change should be fairly safe as
        > well, since we're essentially abstracting record publishing concerns only,
        > which is transparent to the exactly-once semantics / watermark components.
        >
        > Concerning competing stream consumer de-/registration:
        > this would most likely go away with the new source interface, where this
        > can be done on the source split enumerator.
        > I'm personally okay with the proposed strategy of competing with backoff.
        > As food for thought, have you considered an opt-in / opt-out, where the
        > user knows that the client has access to KDS, and can choose to register
        > once only on the client side instead of competing in TMs?
        > I'm not sure if this is worth the extra configuration complexity though.
        >
        > For the concrete next steps for implementation after this FLIP has passed:
        > From the FLIP I can conclude that implementation wise, this would come in
        > a few steps -
        > 1. Abstract away record publishing behind a new interface. Initially we
        > only have one implementation (the current polling mechanism).
        > 2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
        > implementation.
        > 3. Add FanOutRecordPublisher to finalize EFO support.
        >
        > I think step 2. wasn't explicitly mentioned in the FLIP, but I strongly
        > suggest to consolidate that step as a single PR, as we would need to do a
        > license check for dependency changes and it would be nice to move forward
        > with that with at least interference of code changes as possible.
        >
        > Pushing this FLIP forward for approval:
        > Since this FLIP is a fairly consolidated change, we should be safe to
        > proceed with a vote soon.
        > That usually happens in a separate vote thread, linking to this discussion
        > thread.
        >
        > cc'ing Thomas Weise as well, as he has also worked substantially on the
        > Kinesis connector in the past.
        >
        > Cheers,
        > Gordon
        >
        > On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <[hidden email]>
        > wrote:
        >
        >> Hi, Cranmer.
        >>
        >>    I'm Roland Wang. I've read the FLIP you wrote, and agree with your
        >> design.
        >>    Recently, I'm working on this feature too, and have made some progress:
        >>
        >>    1. I add two methods: getOrRegisterConsumer & subscribeToShard on
        >> KinesisProxyInterface.
        >>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
        >>    3. I use the new KinesisProxy to implement ShardConsumer.
        >>
        >>    Though my design is not fully considered, I hope we can discuss a
        >> little
        >> bit about this feature. I wish to make some contribution to the community.
        >>
        >>
        >>
        >>
        >> --
        >> Sent from:
        >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
        >>
        >


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Tzu-Li (Gordon) Tai
Thanks for updating the FLIP Danny. Changes look good to me.
Please feel free to proceed with a vote soon.

On Tue, Jun 30, 2020 at 11:19 PM Cranmer, Danny <[hidden email]>
wrote:

> Hey Gordon,
>
> I have updated the FLIP [1] to include support for configurable
> registration strategies:
> - Added 2 additional configuration keys
> - Added Registration/De-registration Configuration section
> - Updated Stream Consumer Registration/Tear Down section
> - Remove rejected alternative (since we are now optionally supporting it)
>
> A slight tweak. I have added a third option, "none". This will disable
> registration/de-registration and allow the user to directly pass in the
> ConsumerARN. This option adds overhead/complexity to the user configuration
> but it will remove all start-up and teardown AWS SDK calls.
>
> Let me know if you are happy to proceed to a vote or have any further
> feedback.
>
> Thanks,
> Danny Cranmer
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers
>
> On 29/06/2020, 15:28, "Cranmer, Danny" <[hidden email]>
> wrote:
>
>     Hey Gordon,
>
>     Thank-you for you review and feedback.
>
>     I agree with your suggestion for the contribution plan. I have updated
> the FLIP to include the additional step with the FanOutKinesisProxy/AWS SDK
> 2.x dependency. I have also added another precursor step to generally
> improve test coverage. I have been playing around in the connector code and
> discovered that restarting consumption from an aggregated record is not
> covered by unit/integration tests. I have written some additional tests
> with simulated Kinesis behaviour, pushing these tests in advance would
> increase confidence in the next contribution step.
>
>     Regarding the consumer de-/registration. I had not considered making
> it optional via configuration as you propose. I agree with your observation
> on the additional configuration complexity and it would also expose
> internal implementation details to the user. That being said, if a user
> application has a very high parallelism they could quite easily exceed the
> ListStreamConsumers quota (5 TPS [1]), and increase the application
> start-up time substantially with back-off delays. The client could
> conditionally (based on config) register the stream consumer and add the
> ConsumerARN(s) to the consumer properties, eliminating the required
> parallel calls to ListStreamConsumers by the Flink tasks. I will update the
> FLIP to include this change, and reply to this thread once it is done.
>
>     [1]
> https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
>
>     Thanks,
>     Danny,
>
>     On 29/06/2020, 05:19, "Tzu-Li (Gordon) Tai" <[hidden email]>
> wrote:
>
>         CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>
>
>
>         Also, if it wasn't clear, I'll be happy to provide committer
> support on
>         reviewing and merging this FLIP, if it gets approved :)
>
>         On Mon, Jun 29, 2020 at 12:12 PM Tzu-Li (Gordon) Tai <
> [hidden email]>
>         wrote:
>
>         > Hi Cranmer,
>         >
>         > Thank you for proposing the feature and starting the discussion
> thread.
>         > This is really great work!
>         >
>         > Overall, +1 to adding EFO support to the Kinesis connector.
>         > I can see that having a dedicated throughput quota for each
> consuming
>         > Flink application is definitely a requirement for AWS users.
>         > In the past, we worked around this by using adaptive polling to
> avoid
>         > exceeding the quotas with multiple consumers, this would
> probably go away
>         > with this implemented.
>         >
>         > There are a few things I like about the current proposal:
>         > - EFO is an opt-in feature for now. Once we decide to
> reimplement the
>         > Kinesis connector on top of the new source interface (FLIP-27),
> we can
>         > probably consider enabling EFO by default to match the defaults
> of the
>         > higher-level KCL library.
>         > - From the design, it seems like the changes can indeed be fairly
>         > consolidated in the Kinesis connector. The change should be
> fairly safe as
>         > well, since we're essentially abstracting record publishing
> concerns only,
>         > which is transparent to the exactly-once semantics / watermark
> components.
>         >
>         > Concerning competing stream consumer de-/registration:
>         > this would most likely go away with the new source interface,
> where this
>         > can be done on the source split enumerator.
>         > I'm personally okay with the proposed strategy of competing with
> backoff.
>         > As food for thought, have you considered an opt-in / opt-out,
> where the
>         > user knows that the client has access to KDS, and can choose to
> register
>         > once only on the client side instead of competing in TMs?
>         > I'm not sure if this is worth the extra configuration complexity
> though.
>         >
>         > For the concrete next steps for implementation after this FLIP
> has passed:
>         > From the FLIP I can conclude that implementation wise, this
> would come in
>         > a few steps -
>         > 1. Abstract away record publishing behind a new interface.
> Initially we
>         > only have one implementation (the current polling mechanism).
>         > 2. Add in AWS SDK 2.x dependency, with a new FanOutKinesisProxy
>         > implementation.
>         > 3. Add FanOutRecordPublisher to finalize EFO support.
>         >
>         > I think step 2. wasn't explicitly mentioned in the FLIP, but I
> strongly
>         > suggest to consolidate that step as a single PR, as we would
> need to do a
>         > license check for dependency changes and it would be nice to
> move forward
>         > with that with at least interference of code changes as possible.
>         >
>         > Pushing this FLIP forward for approval:
>         > Since this FLIP is a fairly consolidated change, we should be
> safe to
>         > proceed with a vote soon.
>         > That usually happens in a separate vote thread, linking to this
> discussion
>         > thread.
>         >
>         > cc'ing Thomas Weise as well, as he has also worked substantially
> on the
>         > Kinesis connector in the past.
>         >
>         > Cheers,
>         > Gordon
>         >
>         > On Tue, Jun 23, 2020 at 6:02 PM hey_wxl <
> [hidden email]>
>         > wrote:
>         >
>         >> Hi, Cranmer.
>         >>
>         >>    I'm Roland Wang. I've read the FLIP you wrote, and agree
> with your
>         >> design.
>         >>    Recently, I'm working on this feature too, and have made
> some progress:
>         >>
>         >>    1. I add two methods: getOrRegisterConsumer &
> subscribeToShard on
>         >> KinesisProxyInterface.
>         >>    2. I re-implement the KinesisProxy using AWS SDK V2.x.
>         >>    3. I use the new KinesisProxy to implement ShardConsumer.
>         >>
>         >>    Though my design is not fully considered, I hope we can
> discuss a
>         >> little
>         >> bit about this feature. I wish to make some contribution to the
> community.
>         >>
>         >>
>         >>
>         >>
>         >> --
>         >> Sent from:
>         >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>         >>
>         >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Thomas Weise
In reply to this post by Cranmer, Danny
Thanks for the excellent proposal!

Big +1 for introducing EFO as an incremental feature while retaining
backward compatibility! This will make it easier for users to adopt.

Thanks for mentioning the reasons why one might not want to use EFO.
Regarding "Streams with a single consumer would not benefit from the
dedicated throughput (they already have the full quota)": Do you see the
polling (non-EFO) mode as a permanent option going forward?

Regarding "Registration/De-registration Configuration":

The limit for "ListStreamConsumers" is 5 TPS per [1], which is even lower
than that for "DescribeStream". That limit could cause significant issues
during large scale job startup and the only solution was to switch to
ListShards. Perhaps elaborate a bit more on limitations and reasoning for
the different registration options?

De-registration may never happen when task managers go into a bad state and
are forcefully terminated. That won't cause an issue because a stale
registration will be overridden/removed by a new job with the same name?

Thanks,
Thomas


[1]
https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html

On Mon, Jun 22, 2020 at 2:42 AM Cranmer, Danny <[hidden email]>
wrote:

> Hello everyone,
> This is a discussion thread for the FLIP [1] regarding Enhanced Fan Out
> for AWS Kinesis Consumers.
>
> Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to
> utilise a dedicated read throughput, rather than a shared quota. HTTP/2
> reduces latency and typically gives a 65% performance boost [2]. EFO is not
> currently supported by the Flink Kinesis Consumer. Adding EFO support would
> allow Flink applications to reap the benefits, widening Flink adoption.
> Existing applications will be able to optionally perform a backwards
> compatible library upgrade and configuration tweak to inherit the
> performance benefits.
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers
> [2] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
> I look forward to your feedback,
> Thanks,
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers

Cranmer, Danny
In reply to this post by Cranmer, Danny
Hello Thomas,

Thank-you for your vote and feedback on the FLIP.

Q: Do you see the polling (non-EFO) mode as a permanent option going forward?
A: I will follow up on this, I have forwarded the question on. But generally speaking AWS do not usually deprecate APIs. KDS (Kinesis Data Streams) will therefore likely always support the polling mechanism. The question is whether we want to support it within the Flink connector. I will get back to you.

Q: Perhaps elaborate a bit more on limitations and reasoning for the different registration options?
A: I was planning on elaborating in the updated documentation that will be published to the Flink website. Would you like me to update FLIP to include this information in advance?

Q: That won't cause an issue because a stale registration will be overridden/removed by a new job with the same name?
A: Yes exactly, when the consumer name is already registered, either by the user, or from a previous error shutdown. The first ListStreamConsumers call will find the consumer in an ACTIVE state, retrieve the ConsumerARN and tasks will subsequently use that to obtain a subscription (this new subscription will invalidate any existing ones).

Thanks,
Danny

On 06/07/2020, 20:36, "Thomas Weise" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Thanks for the excellent proposal!

    Big +1 for introducing EFO as an incremental feature while retaining
    backward compatibility! This will make it easier for users to adopt.

    Thanks for mentioning the reasons why one might not want to use EFO.
    Regarding "Streams with a single consumer would not benefit from the
    dedicated throughput (they already have the full quota)": Do you see the
    polling (non-EFO) mode as a permanent option going forward?

    Regarding "Registration/De-registration Configuration":

    The limit for "ListStreamConsumers" is 5 TPS per [1], which is even lower
    than that for "DescribeStream". That limit could cause significant issues
    during large scale job startup and the only solution was to switch to
    ListShards. Perhaps elaborate a bit more on limitations and reasoning for
    the different registration options?

    De-registration may never happen when task managers go into a bad state and
    are forcefully terminated. That won't cause an issue because a stale
    registration will be overridden/removed by a new job with the same name?

    Thanks,
    Thomas


    [1]
    https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html

    On Mon, Jun 22, 2020 at 2:42 AM Cranmer, Danny <[hidden email]>
    wrote:

    > Hello everyone,
    > This is a discussion thread for the FLIP [1] regarding Enhanced Fan Out
    > for AWS Kinesis Consumers.
    >
    > Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to
    > utilise a dedicated read throughput, rather than a shared quota. HTTP/2
    > reduces latency and typically gives a 65% performance boost [2]. EFO is not
    > currently supported by the Flink Kinesis Consumer. Adding EFO support would
    > allow Flink applications to reap the benefits, widening Flink adoption.
    > Existing applications will be able to optionally perform a backwards
    > compatible library upgrade and configuration tweak to inherit the
    > performance benefits.
    > [1]
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers
    > [2] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
    > I look forward to your feedback,
    > Thanks,
    >