[DISCUSS] FLIP-171: Async Sink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-171: Async Sink

Hausmann, Steffen
Hi there,

We would like to start a discussion thread on "FLIP-171: Async Sink" [1], where we propose to create a common abstraction for destinations that support async requests. This abstraction will make it easier to add destinations to Flink by implementing a lightweight shim, while it avoids maintaining dozens of independent sinks.

Looking forward to your feedback.

Cheers, Steffen

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-171: Async Sink

Piotr Nowojski-5
Hi Steffen,

Thanks for writing down the proposal. Back when the new Sink API was being
discussed, I was proposing to add our usual `CompletableFuture<Void>
isAvailable()` pattern to make sinks non-blocking. You can see the
discussion starting here [1], and continuing for a couple of more posts
until here [2]. Back then, the outcome was that it would give very little
benefit, at the expense of making the API more complicated. Could you maybe
relate your proposal to that discussion from last year?

I see that your proposal is going much further than just adding the
availability method, could you also motivate this a bit further? Could you
maybe reference/show some sinks that:
1. are already implemented using FLIP-143
2. that have some code duplication...
3. ...this duplication would be solved by FLIP-171

Best,
Piotrek

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

śr., 9 cze 2021 o 09:49 Hausmann, Steffen <[hidden email]>
napisał(a):

> Hi there,
>
> We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
> where we propose to create a common abstraction for destinations that
> support async requests. This abstraction will make it easier to add
> destinations to Flink by implementing a lightweight shim, while it avoids
> maintaining dozens of independent sinks.
>
> Looking forward to your feedback.
>
> Cheers, Steffen
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-171: Async Sink

Hausmann, Steffen
In reply to this post by Hausmann, Steffen
Hey Piotrek,

Thanks for your comments on the FLIP. I'll address your second question first, as I think it's more central to this FLIP. Just looking at the AWS ecosystem, there are several sinks with overlapping functionality. I've chosen AWS sinks here because I'm most familiar with those, but a similar argument applies more generically for destination that support async ingest.

There is, for instance, a sink for Amazon Kinesis Data Streams that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All these sinks have implemented their own mechanisms for batching, persisting, and retrying events. And I'm not sure if all of them properly participate in checkpointing. [3] even seems to closely mirror [1] as it contains references to the Kinesis Producer Library, which is unrelated to Amazon DynamoDB.

These sinks predate FLIP-143. But as batching, persisting, and retrying capabilities do not seem to be part of FLIP-143, I'd argue that we would end up with similar duplication, even if these sinks were rewritten today based on FLIP-143. And that's the idea of FLIP-171: abstract away these commonly required capabilities so that it becomes easy to create support for a wide range of destination without having to think about batching, retries, checkpointing, etc. I've included an example in the FLIP [5] that shows that it only takes a couple of lines of code to implement a sink with exactly-once semantics. To be fair, the example is lacking robust failure handling and some more advanced capabilities of [1], but I think it still supports this point.

Regarding your point on the isAvailable pattern. We need some way for the sink to propagate backpressure and we would also like to support time based buffering hints. There are two options I currently see and would need additional input on which one is the better or more desirable one. The first option is to use the non-blocking isAvailable pattern. Internally, the sink persists buffered events in the snapshot state which avoids having to flush buffered record on a checkpoint. This seems to align well with the non-blocking isAvailable pattern. The second option is to make calls to `write` blocking and leverage an internal thread to trigger flushes based on time based buffering hints. We've discussed these options with Arvid and suggested to assumed that the `isAvailable` pattern will become available for sinks through and additional FLIP.

I think it is an important discussion to have. My understanding of the implications for Flink in general are very naïve, so I'd be happy to get further guidance. However, I don't want to make this discussion part of FLIP-171. For FLIP-171 we'll use whatever is available.

Does that make sense?

Cheers, Steffen


[1] https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis 
[2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
[3] https://github.com/klarna-incubator/flink-connector-dynamodb
[4] https://github.com/awslabs/amazon-timestream-tools/
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams


On 09.06.21, 19:44, "Piotr Nowojski" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hi Steffen,

    Thanks for writing down the proposal. Back when the new Sink API was being
    discussed, I was proposing to add our usual `CompletableFuture<Void>
    isAvailable()` pattern to make sinks non-blocking. You can see the
    discussion starting here [1], and continuing for a couple of more posts
    until here [2]. Back then, the outcome was that it would give very little
    benefit, at the expense of making the API more complicated. Could you maybe
    relate your proposal to that discussion from last year?

    I see that your proposal is going much further than just adding the
    availability method, could you also motivate this a bit further? Could you
    maybe reference/show some sinks that:
    1. are already implemented using FLIP-143
    2. that have some code duplication...
    3. ...this duplication would be solved by FLIP-171

    Best,
    Piotrek

    [1]
    http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
    [2]
    http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

    śr., 9 cze 2021 o 09:49 Hausmann, Steffen <[hidden email]>
    napisał(a):

    > Hi there,
    >
    > We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
    > where we propose to create a common abstraction for destinations that
    > support async requests. This abstraction will make it easier to add
    > destinations to Flink by implementing a lightweight shim, while it avoids
    > maintaining dozens of independent sinks.
    >
    > Looking forward to your feedback.
    >
    > Cheers, Steffen
    >
    > [1]
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >
    >
    >
    > Amazon Web Services EMEA SARL
    > 38 avenue John F. Kennedy, L-1855 Luxembourg
    > Sitz der Gesellschaft: L-1855 Luxemburg
    > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    > Marcel-Breuer-Str. 12, D-80807 Muenchen
    > Sitz der Zweigniederlassung: Muenchen
    > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
    > USt-ID DE317013094
    >
    >
    >
    >




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-171: Async Sink

Cranmer, Danny-2
In reply to this post by Hausmann, Steffen
Hey Steffen,

I have a few questions regarding the FLIP:
1. Where do you expect the core code to live, would it be in an existing module (say flink-clients) or would you introduce a new module?
2. Which destination implementations do you intend to ship with this FLIP? I see an example with Kinesis but you also list a bunch of other candidates.
3. For the Kinesis implementation, would you add the Sink to the existing flink-connector-kinesis repo, or create a new module? Reason I ask is that the existing Kinesis Sink depends on KPL and has a heavy transitive dependency chain, removing this would substantially reduce application size and clean the dependency chain

Thanks,

On 10/06/2021, 09:09, "Hausmann, Steffen" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hey Piotrek,

    Thanks for your comments on the FLIP. I'll address your second question first, as I think it's more central to this FLIP. Just looking at the AWS ecosystem, there are several sinks with overlapping functionality. I've chosen AWS sinks here because I'm most familiar with those, but a similar argument applies more generically for destination that support async ingest.

    There is, for instance, a sink for Amazon Kinesis Data Streams that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All these sinks have implemented their own mechanisms for batching, persisting, and retrying events. And I'm not sure if all of them properly participate in checkpointing. [3] even seems to closely mirror [1] as it contains references to the Kinesis Producer Library, which is unrelated to Amazon DynamoDB.

    These sinks predate FLIP-143. But as batching, persisting, and retrying capabilities do not seem to be part of FLIP-143, I'd argue that we would end up with similar duplication, even if these sinks were rewritten today based on FLIP-143. And that's the idea of FLIP-171: abstract away these commonly required capabilities so that it becomes easy to create support for a wide range of destination without having to think about batching, retries, checkpointing, etc. I've included an example in the FLIP [5] that shows that it only takes a couple of lines of code to implement a sink with exactly-once semantics. To be fair, the example is lacking robust failure handling and some more advanced capabilities of [1], but I think it still supports this point.

    Regarding your point on the isAvailable pattern. We need some way for the sink to propagate backpressure and we would also like to support time based buffering hints. There are two options I currently see and would need additional input on which one is the better or more desirable one. The first option is to use the non-blocking isAvailable pattern. Internally, the sink persists buffered events in the snapshot state which avoids having to flush buffered record on a checkpoint. This seems to align well with the non-blocking isAvailable pattern. The second option is to make calls to `write` blocking and leverage an internal thread to trigger flushes based on time based buffering hints. We've discussed these options with Arvid and suggested to assumed that the `isAvailable` pattern will become available for sinks through and additional FLIP.

    I think it is an important discussion to have. My understanding of the implications for Flink in general are very naïve, so I'd be happy to get further guidance. However, I don't want to make this discussion part of FLIP-171. For FLIP-171 we'll use whatever is available.

    Does that make sense?

    Cheers, Steffen


    [1] https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
    [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
    [3] https://github.com/klarna-incubator/flink-connector-dynamodb
    [4] https://github.com/awslabs/amazon-timestream-tools/
    [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams


    On 09.06.21, 19:44, "Piotr Nowojski" <[hidden email]> wrote:

        CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



        Hi Steffen,

        Thanks for writing down the proposal. Back when the new Sink API was being
        discussed, I was proposing to add our usual `CompletableFuture<Void>
        isAvailable()` pattern to make sinks non-blocking. You can see the
        discussion starting here [1], and continuing for a couple of more posts
        until here [2]. Back then, the outcome was that it would give very little
        benefit, at the expense of making the API more complicated. Could you maybe
        relate your proposal to that discussion from last year?

        I see that your proposal is going much further than just adding the
        availability method, could you also motivate this a bit further? Could you
        maybe reference/show some sinks that:
        1. are already implemented using FLIP-143
        2. that have some code duplication...
        3. ...this duplication would be solved by FLIP-171

        Best,
        Piotrek

        [1]
        http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
        [2]
        http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

        śr., 9 cze 2021 o 09:49 Hausmann, Steffen <[hidden email]>
        napisał(a):

        > Hi there,
        >
        > We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
        > where we propose to create a common abstraction for destinations that
        > support async requests. This abstraction will make it easier to add
        > destinations to Flink by implementing a lightweight shim, while it avoids
        > maintaining dozens of independent sinks.
        >
        > Looking forward to your feedback.
        >
        > Cheers, Steffen
        >
        > [1]
        > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
        >
        >
        >
        > Amazon Web Services EMEA SARL
        > 38 avenue John F. Kennedy, L-1855 Luxembourg
        > Sitz der Gesellschaft: L-1855 Luxemburg
        > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
        >
        > Amazon Web Services EMEA SARL, Niederlassung Deutschland
        > Marcel-Breuer-Str. 12, D-80807 Muenchen
        > Sitz der Zweigniederlassung: Muenchen
        > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
        > USt-ID DE317013094
        >
        >
        >
        >




    Amazon Web Services EMEA SARL
    38 avenue John F. Kennedy, L-1855 Luxembourg
    Sitz der Gesellschaft: L-1855 Luxemburg
    eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

    Amazon Web Services EMEA SARL, Niederlassung Deutschland
    Marcel-Breuer-Str. 12, D-80807 Muenchen
    Sitz der Zweigniederlassung: Muenchen
    eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094




Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-171: Async Sink

Hausmann, Steffen
Hi Danny,

Right now, I'd expect the core of the Async Sink (without third party dependencies) to live in its own submodule. For instance `flink-connector-async` as part of `flink-connectors`.

I'm currently planning to implement three different sinks to verify that the design of the sink if flexible enough to support different services: Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon DynamoDB. But I'm not sure where to actually put them. To keep is simple, I'd start with a module that contains all AWS specific connectors. However, it has the obvious disadvantage that if someone wants to use a single sink, they would need to pull in all dependencies for all supported services that are included in this module (mainly the AWS SDK for these services). But I don't know how much of a problem that's going to be in practice. If the respective jar grows too big because all the included dependencies, that's certainly not going to work. But for now I'd just give it a try and then start a discussion once I have more data to share.

What's more interesting is whether that module should be part of the Flink code base or live somewhere else. I'd be great to get some feedback from the community on this.

Regarding the Kinesis Data Streams sink, I fully agree that it would be nice to remove the dependency to the KPL. So it seems to be desirable to keep the existing and the new FLIP-171  based implementation in separate modules. Otherwise people would be forced to pull in the KPL dependencies, even if they are only using the new implementation. In addition, the new implementation will not support the exact same functionality as the existing one: the KPL implements a very optimized form of aggregation on a shard level [1] by maintaining a mapping of shards and their respective key spaces. The new implementation can in principle support aggregation as well, but only on a partition key level, which may lead to less efficient aggregation and higher latencies.

Cheers, Steffen

[1] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation



On 15.06.21, 19:52, "Cranmer, Danny" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hey Steffen,

    I have a few questions regarding the FLIP:
    1. Where do you expect the core code to live, would it be in an existing module (say flink-clients) or would you introduce a new module?
    2. Which destination implementations do you intend to ship with this FLIP? I see an example with Kinesis but you also list a bunch of other candidates.
    3. For the Kinesis implementation, would you add the Sink to the existing flink-connector-kinesis repo, or create a new module? Reason I ask is that the existing Kinesis Sink depends on KPL and has a heavy transitive dependency chain, removing this would substantially reduce application size and clean the dependency chain

    Thanks,

    On 10/06/2021, 09:09, "Hausmann, Steffen" <[hidden email]> wrote:

        CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



        Hey Piotrek,

        Thanks for your comments on the FLIP. I'll address your second question first, as I think it's more central to this FLIP. Just looking at the AWS ecosystem, there are several sinks with overlapping functionality. I've chosen AWS sinks here because I'm most familiar with those, but a similar argument applies more generically for destination that support async ingest.

        There is, for instance, a sink for Amazon Kinesis Data Streams that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All these sinks have implemented their own mechanisms for batching, persisting, and retrying events. And I'm not sure if all of them properly participate in checkpointing. [3] even seems to closely mirror [1] as it contains references to the Kinesis Producer Library, which is unrelated to Amazon DynamoDB.

        These sinks predate FLIP-143. But as batching, persisting, and retrying capabilities do not seem to be part of FLIP-143, I'd argue that we would end up with similar duplication, even if these sinks were rewritten today based on FLIP-143. And that's the idea of FLIP-171: abstract away these commonly required capabilities so that it becomes easy to create support for a wide range of destination without having to think about batching, retries, checkpointing, etc. I've included an example in the FLIP [5] that shows that it only takes a couple of lines of code to implement a sink with exactly-once semantics. To be fair, the example is lacking robust failure handling and some more advanced capabilities of [1], but I think it still supports this point.

        Regarding your point on the isAvailable pattern. We need some way for the sink to propagate backpressure and we would also like to support time based buffering hints. There are two options I currently see and would need additional input on which one is the better or more desirable one. The first option is to use the non-blocking isAvailable pattern. Internally, the sink persists buffered events in the snapshot state which avoids having to flush buffered record on a checkpoint. This seems to align well with the non-blocking isAvailable pattern. The second option is to make calls to `write` blocking and leverage an internal thread to trigger flushes based on time based buffering hints. We've discussed these options with Arvid and suggested to assumed that the `isAvailable` pattern will become available for sinks through and additional FLIP.

        I think it is an important discussion to have. My understanding of the implications for Flink in general are very naïve, so I'd be happy to get further guidance. However, I don't want to make this discussion part of FLIP-171. For FLIP-171 we'll use whatever is available.

        Does that make sense?

        Cheers, Steffen


        [1] https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
        [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
        [3] https://github.com/klarna-incubator/flink-connector-dynamodb
        [4] https://github.com/awslabs/amazon-timestream-tools/
        [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams


        On 09.06.21, 19:44, "Piotr Nowojski" <[hidden email]> wrote:

            CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



            Hi Steffen,

            Thanks for writing down the proposal. Back when the new Sink API was being
            discussed, I was proposing to add our usual `CompletableFuture<Void>
            isAvailable()` pattern to make sinks non-blocking. You can see the
            discussion starting here [1], and continuing for a couple of more posts
            until here [2]. Back then, the outcome was that it would give very little
            benefit, at the expense of making the API more complicated. Could you maybe
            relate your proposal to that discussion from last year?

            I see that your proposal is going much further than just adding the
            availability method, could you also motivate this a bit further? Could you
            maybe reference/show some sinks that:
            1. are already implemented using FLIP-143
            2. that have some code duplication...
            3. ...this duplication would be solved by FLIP-171

            Best,
            Piotrek

            [1]
            http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
            [2]
            http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

            śr., 9 cze 2021 o 09:49 Hausmann, Steffen <[hidden email]>
            napisał(a):

            > Hi there,
            >
            > We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
            > where we propose to create a common abstraction for destinations that
            > support async requests. This abstraction will make it easier to add
            > destinations to Flink by implementing a lightweight shim, while it avoids
            > maintaining dozens of independent sinks.
            >
            > Looking forward to your feedback.
            >
            > Cheers, Steffen
            >
            > [1]
            > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
            >
            >
            >
            > Amazon Web Services EMEA SARL
            > 38 avenue John F. Kennedy, L-1855 Luxembourg
            > Sitz der Gesellschaft: L-1855 Luxemburg
            > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
            >
            > Amazon Web Services EMEA SARL, Niederlassung Deutschland
            > Marcel-Breuer-Str. 12, D-80807 Muenchen
            > Sitz der Zweigniederlassung: Muenchen
            > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
            > USt-ID DE317013094
            >
            >
            >
            >




        Amazon Web Services EMEA SARL
        38 avenue John F. Kennedy, L-1855 Luxembourg
        Sitz der Gesellschaft: L-1855 Luxemburg
        eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

        Amazon Web Services EMEA SARL, Niederlassung Deutschland
        Marcel-Breuer-Str. 12, D-80807 Muenchen
        Sitz der Zweigniederlassung: Muenchen
        eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094








Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-171: Async Sink

Arvid Heise-4
Hi Danny,

to add I'd propose to use the flink-connector-base package which has the
rough equivalent on source-side SourceReaderBase [1]. Since it's such a
handy base implementation, I'd like to see it directly in the main flink
repository.

For the actual connectors, I'm currently working on a proposal for a common
connector repository under Flink umbrella.

[1]
https://github.com/AHeise/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L58-L58

On Wed, Jun 16, 2021 at 6:06 PM Hausmann, Steffen <[hidden email]>
wrote:

> Hi Danny,
>
> Right now, I'd expect the core of the Async Sink (without third party
> dependencies) to live in its own submodule. For instance
> `flink-connector-async` as part of `flink-connectors`.
>
> I'm currently planning to implement three different sinks to verify that
> the design of the sink if flexible enough to support different services:
> Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon
> DynamoDB. But I'm not sure where to actually put them. To keep is simple,
> I'd start with a module that contains all AWS specific connectors. However,
> it has the obvious disadvantage that if someone wants to use a single sink,
> they would need to pull in all dependencies for all supported services that
> are included in this module (mainly the AWS SDK for these services). But I
> don't know how much of a problem that's going to be in practice. If the
> respective jar grows too big because all the included dependencies, that's
> certainly not going to work. But for now I'd just give it a try and then
> start a discussion once I have more data to share.
>
> What's more interesting is whether that module should be part of the Flink
> code base or live somewhere else. I'd be great to get some feedback from
> the community on this.
>
> Regarding the Kinesis Data Streams sink, I fully agree that it would be
> nice to remove the dependency to the KPL. So it seems to be desirable to
> keep the existing and the new FLIP-171  based implementation in separate
> modules. Otherwise people would be forced to pull in the KPL dependencies,
> even if they are only using the new implementation. In addition, the new
> implementation will not support the exact same functionality as the
> existing one: the KPL implements a very optimized form of aggregation on a
> shard level [1] by maintaining a mapping of shards and their respective key
> spaces. The new implementation can in principle support aggregation as
> well, but only on a partition key level, which may lead to less efficient
> aggregation and higher latencies.
>
> Cheers, Steffen
>
> [1]
> https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
>
>
>
> On 15.06.21, 19:52, "Cranmer, Danny" <[hidden email]>
> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hey Steffen,
>
>     I have a few questions regarding the FLIP:
>     1. Where do you expect the core code to live, would it be in an
> existing module (say flink-clients) or would you introduce a new module?
>     2. Which destination implementations do you intend to ship with this
> FLIP? I see an example with Kinesis but you also list a bunch of other
> candidates.
>     3. For the Kinesis implementation, would you add the Sink to the
> existing flink-connector-kinesis repo, or create a new module? Reason I ask
> is that the existing Kinesis Sink depends on KPL and has a heavy transitive
> dependency chain, removing this would substantially reduce application size
> and clean the dependency chain
>
>     Thanks,
>
>     On 10/06/2021, 09:09, "Hausmann, Steffen" <[hidden email]>
> wrote:
>
>         CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>
>
>
>         Hey Piotrek,
>
>         Thanks for your comments on the FLIP. I'll address your second
> question first, as I think it's more central to this FLIP. Just looking at
> the AWS ecosystem, there are several sinks with overlapping functionality.
> I've chosen AWS sinks here because I'm most familiar with those, but a
> similar argument applies more generically for destination that support
> async ingest.
>
>         There is, for instance, a sink for Amazon Kinesis Data Streams
> that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose
> [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4].
> All these sinks have implemented their own mechanisms for batching,
> persisting, and retrying events. And I'm not sure if all of them properly
> participate in checkpointing. [3] even seems to closely mirror [1] as it
> contains references to the Kinesis Producer Library, which is unrelated to
> Amazon DynamoDB.
>
>         These sinks predate FLIP-143. But as batching, persisting, and
> retrying capabilities do not seem to be part of FLIP-143, I'd argue that we
> would end up with similar duplication, even if these sinks were rewritten
> today based on FLIP-143. And that's the idea of FLIP-171: abstract away
> these commonly required capabilities so that it becomes easy to create
> support for a wide range of destination without having to think about
> batching, retries, checkpointing, etc. I've included an example in the FLIP
> [5] that shows that it only takes a couple of lines of code to implement a
> sink with exactly-once semantics. To be fair, the example is lacking robust
> failure handling and some more advanced capabilities of [1], but I think it
> still supports this point.
>
>         Regarding your point on the isAvailable pattern. We need some way
> for the sink to propagate backpressure and we would also like to support
> time based buffering hints. There are two options I currently see and would
> need additional input on which one is the better or more desirable one. The
> first option is to use the non-blocking isAvailable pattern. Internally,
> the sink persists buffered events in the snapshot state which avoids having
> to flush buffered record on a checkpoint. This seems to align well with the
> non-blocking isAvailable pattern. The second option is to make calls to
> `write` blocking and leverage an internal thread to trigger flushes based
> on time based buffering hints. We've discussed these options with Arvid and
> suggested to assumed that the `isAvailable` pattern will become available
> for sinks through and additional FLIP.
>
>         I think it is an important discussion to have. My understanding of
> the implications for Flink in general are very naïve, so I'd be happy to
> get further guidance. However, I don't want to make this discussion part of
> FLIP-171. For FLIP-171 we'll use whatever is available.
>
>         Does that make sense?
>
>         Cheers, Steffen
>
>
>         [1]
> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
>         [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
>         [3] https://github.com/klarna-incubator/flink-connector-dynamodb
>         [4] https://github.com/awslabs/amazon-timestream-tools/
>         [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
>
>
>         On 09.06.21, 19:44, "Piotr Nowojski" <[hidden email]> wrote:
>
>             CAUTION: This email originated from outside of the
> organization. Do not click links or open attachments unless you can confirm
> the sender and know the content is safe.
>
>
>
>             Hi Steffen,
>
>             Thanks for writing down the proposal. Back when the new Sink
> API was being
>             discussed, I was proposing to add our usual
> `CompletableFuture<Void>
>             isAvailable()` pattern to make sinks non-blocking. You can see
> the
>             discussion starting here [1], and continuing for a couple of
> more posts
>             until here [2]. Back then, the outcome was that it would give
> very little
>             benefit, at the expense of making the API more complicated.
> Could you maybe
>             relate your proposal to that discussion from last year?
>
>             I see that your proposal is going much further than just
> adding the
>             availability method, could you also motivate this a bit
> further? Could you
>             maybe reference/show some sinks that:
>             1. are already implemented using FLIP-143
>             2. that have some code duplication...
>             3. ...this duplication would be solved by FLIP-171
>
>             Best,
>             Piotrek
>
>             [1]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
>             [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
>
>             śr., 9 cze 2021 o 09:49 Hausmann, Steffen
> <[hidden email]>
>             napisał(a):
>
>             > Hi there,
>             >
>             > We would like to start a discussion thread on "FLIP-171:
> Async Sink" [1],
>             > where we propose to create a common abstraction for
> destinations that
>             > support async requests. This abstraction will make it easier
> to add
>             > destinations to Flink by implementing a lightweight shim,
> while it avoids
>             > maintaining dozens of independent sinks.
>             >
>             > Looking forward to your feedback.
>             >
>             > Cheers, Steffen
>             >
>             > [1]
>             >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>             >
>             >
>             >
>             > Amazon Web Services EMEA SARL
>             > 38 avenue John F. Kennedy, L-1855 Luxembourg
>             > Sitz der Gesellschaft: L-1855 Luxemburg
>             > eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>             >
>             > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>             > Marcel-Breuer-Str. 12, D-80807 Muenchen
>             > Sitz der Zweigniederlassung: Muenchen
>             > eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter HRB 242240,
>             > USt-ID DE317013094
>             >
>             >
>             >
>             >
>
>
>
>
>         Amazon Web Services EMEA SARL
>         38 avenue John F. Kennedy, L-1855 Luxembourg
>         Sitz der Gesellschaft: L-1855 Luxemburg
>         eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
>         Amazon Web Services EMEA SARL, Niederlassung Deutschland
>         Marcel-Breuer-Str. 12, D-80807 Muenchen
>         Sitz der Zweigniederlassung: Muenchen
>         eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240, USt-ID DE317013094
>
>
>
>
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>