[DISCUSS] Introducing Backpressure for connected streams

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Introducing Backpressure for connected streams

Robin KC
Hi all,

The issue has been discussed before here -
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html

Our use case requires event time join of two streams and we use
ConnectedStreams for the same. Within the CoProcessFunction, we buffer
records until watermark and perform the join and business logic based on
watermark. The issue is if one stream is slower than the other, the buffer
(a rocksdb state) is unnecessarily filled by continuously reading from the
fast stream.

I took an inspiration from a response on the same thread
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html>
by Elias Levy -

The idea I was suggesting is not for operators to block an input.  Rather,
it is that they selectively choose from which input to process the next
message from based on their timestamp, so long as there are buffered
messages waiting to be processed.  That is a best-effort alignment
strategy.  Seems to work relatively well in practice, at least within Kafka
Streams.

E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
its inputs.  Instead, it could keep them separate and selectively consume
from the one that had a buffer available, and if both have buffers
available, from the buffer with the messages with a lower timestamp.

And attempted a POC implementation of CoBackpressure whenever 2 streams are
connected. This is committed in a branch in my own fork -
https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1

The approach is

   1. Provide a new method setCoBackpressureThreshold on ConnectedStream
   2. Pass the user-provided CoBackpressureThreshold through various
   classes till StreamTwoInputProcessorFactory.
   3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that
   pauses input1 or input2 if the diff between watermarks is greater than the
   threshold. In other words, it selectively chooses the next input which is
   lagging behind.

Some key points

   1. One benefit of this approach is that a user can configure
   CoBackpressureThreshold at any join and it is not a global config.
   2. IntervalJoins internally use ConnectedStreams and therefore this will
   work for intervalJoins as well.
   3. Other window joins do not use ConnectedStreams but use
   UnionedStreams. Will have to find a solution for that.
   4. I believe MultipleInputStreams will also need similar functioality.
   5. IMP: This approach does not solve the problem of having event-time
   skew within different partitions/shards of the same input source. It only
   solves for event time alignment of different sources.

Looking forward to inputs on the same. If this seems like a feasible
approach, I can take it forward and implement code with fixes for
identified gaps and appropriate test cases.

Thanks,
Robin
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introducing Backpressure for connected streams

Guowei Ma
Hi Robin

Thank you for bringing up this discussion. AFAIK there are many same
requirements.But it might lead to a deadlock if we depend on pausing one
input of two to align the watermark.
After the FLIP-27 Flink would introduce some new mechanism for aligning the
watermark of different sources .Maybe @Becket could give some inputs or
some plans for this.

Best,
Guowei


On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]> wrote:

> Hi all,
>
> The issue has been discussed before here -
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
>
> Our use case requires event time join of two streams and we use
> ConnectedStreams for the same. Within the CoProcessFunction, we buffer
> records until watermark and perform the join and business logic based on
> watermark. The issue is if one stream is slower than the other, the buffer
> (a rocksdb state) is unnecessarily filled by continuously reading from the
> fast stream.
>
> I took an inspiration from a response on the same thread
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html
> >
> by Elias Levy -
>
> The idea I was suggesting is not for operators to block an input.  Rather,
> it is that they selectively choose from which input to process the next
> message from based on their timestamp, so long as there are buffered
> messages waiting to be processed.  That is a best-effort alignment
> strategy.  Seems to work relatively well in practice, at least within Kafka
> Streams.
>
> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> its inputs.  Instead, it could keep them separate and selectively consume
> from the one that had a buffer available, and if both have buffers
> available, from the buffer with the messages with a lower timestamp.
>
> And attempted a POC implementation of CoBackpressure whenever 2 streams are
> connected. This is committed in a branch in my own fork -
>
> https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1
>
> The approach is
>
>    1. Provide a new method setCoBackpressureThreshold on ConnectedStream
>    2. Pass the user-provided CoBackpressureThreshold through various
>    classes till StreamTwoInputProcessorFactory.
>    3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that
>    pauses input1 or input2 if the diff between watermarks is greater than
> the
>    threshold. In other words, it selectively chooses the next input which
> is
>    lagging behind.
>
> Some key points
>
>    1. One benefit of this approach is that a user can configure
>    CoBackpressureThreshold at any join and it is not a global config.
>    2. IntervalJoins internally use ConnectedStreams and therefore this will
>    work for intervalJoins as well.
>    3. Other window joins do not use ConnectedStreams but use
>    UnionedStreams. Will have to find a solution for that.
>    4. I believe MultipleInputStreams will also need similar functioality.
>    5. IMP: This approach does not solve the problem of having event-time
>    skew within different partitions/shards of the same input source. It
> only
>    solves for event time alignment of different sources.
>
> Looking forward to inputs on the same. If this seems like a feasible
> approach, I can take it forward and implement code with fixes for
> identified gaps and appropriate test cases.
>
> Thanks,
> Robin
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introducing Backpressure for connected streams

Piotr Nowojski-5
Hi Robin,

Flink has a functionality to block reading from an input. It's not
documented, as it's not fully working in Streaming. Take a look at the
`org.apache.flink.streaming.api.operators.InputSelectable` class and how
it's being used (you have to implement your own operator to use it). It has
two limitations:

1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph,
this can lead to deadlocks in the job.
2. When blocking reads from an input, you are blocking checkpoints from
progressing, so currently there is a safety check that disallows using
`InputSelectable` when checkpointing is enabled.

Having said that, and as Guowei mentioned, this doesn't seem to be a
correct solution for your problem. Throttling FLIP-27 sources based on
watermarks progression is the correct approach here, that avoids above
issues.

Best,
Piotrek

śr., 24 mar 2021 o 08:23 Guowei Ma <[hidden email]> napisał(a):

> Hi Robin
>
> Thank you for bringing up this discussion. AFAIK there are many same
> requirements.But it might lead to a deadlock if we depend on pausing one
> input of two to align the watermark.
> After the FLIP-27 Flink would introduce some new mechanism for aligning the
> watermark of different sources .Maybe @Becket could give some inputs or
> some plans for this.
>
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]> wrote:
>
> > Hi all,
> >
> > The issue has been discussed before here -
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
> >
> > Our use case requires event time join of two streams and we use
> > ConnectedStreams for the same. Within the CoProcessFunction, we buffer
> > records until watermark and perform the join and business logic based on
> > watermark. The issue is if one stream is slower than the other, the
> buffer
> > (a rocksdb state) is unnecessarily filled by continuously reading from
> the
> > fast stream.
> >
> > I took an inspiration from a response on the same thread
> > <
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html
> > >
> > by Elias Levy -
> >
> > The idea I was suggesting is not for operators to block an input.
> Rather,
> > it is that they selectively choose from which input to process the next
> > message from based on their timestamp, so long as there are buffered
> > messages waiting to be processed.  That is a best-effort alignment
> > strategy.  Seems to work relatively well in practice, at least within
> Kafka
> > Streams.
> >
> > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> > its inputs.  Instead, it could keep them separate and selectively consume
> > from the one that had a buffer available, and if both have buffers
> > available, from the buffer with the messages with a lower timestamp.
> >
> > And attempted a POC implementation of CoBackpressure whenever 2 streams
> are
> > connected. This is committed in a branch in my own fork -
> >
> >
> https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1
> >
> > The approach is
> >
> >    1. Provide a new method setCoBackpressureThreshold on ConnectedStream
> >    2. Pass the user-provided CoBackpressureThreshold through various
> >    classes till StreamTwoInputProcessorFactory.
> >    3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that
> >    pauses input1 or input2 if the diff between watermarks is greater than
> > the
> >    threshold. In other words, it selectively chooses the next input which
> > is
> >    lagging behind.
> >
> > Some key points
> >
> >    1. One benefit of this approach is that a user can configure
> >    CoBackpressureThreshold at any join and it is not a global config.
> >    2. IntervalJoins internally use ConnectedStreams and therefore this
> will
> >    work for intervalJoins as well.
> >    3. Other window joins do not use ConnectedStreams but use
> >    UnionedStreams. Will have to find a solution for that.
> >    4. I believe MultipleInputStreams will also need similar functioality.
> >    5. IMP: This approach does not solve the problem of having event-time
> >    skew within different partitions/shards of the same input source. It
> > only
> >    solves for event time alignment of different sources.
> >
> > Looking forward to inputs on the same. If this seems like a feasible
> > approach, I can take it forward and implement code with fixes for
> > identified gaps and appropriate test cases.
> >
> > Thanks,
> > Robin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introducing Backpressure for connected streams

Robin KC
Thanks Pioter and Guowei for the inputs. I understand that this can lead to
deadlocks and the right solution should be based on FLIP-27.
Will look further into FLIP-27 for this.

Thanks,
Robin


On Wed, Mar 24, 2021 at 2:21 PM Piotr Nowojski <[hidden email]> wrote:

> Hi Robin,
>
> Flink has a functionality to block reading from an input. It's not
> documented, as it's not fully working in Streaming. Take a look at the
> `org.apache.flink.streaming.api.operators.InputSelectable` class and how
> it's being used (you have to implement your own operator to use it). It has
> two limitations:
>
> 1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph,
> this can lead to deadlocks in the job.
> 2. When blocking reads from an input, you are blocking checkpoints from
> progressing, so currently there is a safety check that disallows using
> `InputSelectable` when checkpointing is enabled.
>
> Having said that, and as Guowei mentioned, this doesn't seem to be a
> correct solution for your problem. Throttling FLIP-27 sources based on
> watermarks progression is the correct approach here, that avoids above
> issues.
>
> Best,
> Piotrek
>
> śr., 24 mar 2021 o 08:23 Guowei Ma <[hidden email]> napisał(a):
>
> > Hi Robin
> >
> > Thank you for bringing up this discussion. AFAIK there are many same
> > requirements.But it might lead to a deadlock if we depend on pausing one
> > input of two to align the watermark.
> > After the FLIP-27 Flink would introduce some new mechanism for aligning
> the
> > watermark of different sources .Maybe @Becket could give some inputs or
> > some plans for this.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]>
> wrote:
> >
> > > Hi all,
> > >
> > > The issue has been discussed before here -
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
> > >
> > > Our use case requires event time join of two streams and we use
> > > ConnectedStreams for the same. Within the CoProcessFunction, we buffer
> > > records until watermark and perform the join and business logic based
> on
> > > watermark. The issue is if one stream is slower than the other, the
> > buffer
> > > (a rocksdb state) is unnecessarily filled by continuously reading from
> > the
> > > fast stream.
> > >
> > > I took an inspiration from a response on the same thread
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html
> > > >
> > > by Elias Levy -
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> > >
> > > And attempted a POC implementation of CoBackpressure whenever 2 streams
> > are
> > > connected. This is committed in a branch in my own fork -
> > >
> > >
> >
> https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1
> > >
> > > The approach is
> > >
> > >    1. Provide a new method setCoBackpressureThreshold on
> ConnectedStream
> > >    2. Pass the user-provided CoBackpressureThreshold through various
> > >    classes till StreamTwoInputProcessorFactory.
> > >    3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory
> that
> > >    pauses input1 or input2 if the diff between watermarks is greater
> than
> > > the
> > >    threshold. In other words, it selectively chooses the next input
> which
> > > is
> > >    lagging behind.
> > >
> > > Some key points
> > >
> > >    1. One benefit of this approach is that a user can configure
> > >    CoBackpressureThreshold at any join and it is not a global config.
> > >    2. IntervalJoins internally use ConnectedStreams and therefore this
> > will
> > >    work for intervalJoins as well.
> > >    3. Other window joins do not use ConnectedStreams but use
> > >    UnionedStreams. Will have to find a solution for that.
> > >    4. I believe MultipleInputStreams will also need similar
> functioality.
> > >    5. IMP: This approach does not solve the problem of having
> event-time
> > >    skew within different partitions/shards of the same input source. It
> > > only
> > >    solves for event time alignment of different sources.
> > >
> > > Looking forward to inputs on the same. If this seems like a feasible
> > > approach, I can take it forward and implement code with fixes for
> > > identified gaps and appropriate test cases.
> > >
> > > Thanks,
> > > Robin
> > >
> >
>