Hi all,
The issue has been discussed before here - http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html Our use case requires event time join of two streams and we use ConnectedStreams for the same. Within the CoProcessFunction, we buffer records until watermark and perform the join and business logic based on watermark. The issue is if one stream is slower than the other, the buffer (a rocksdb state) is unnecessarily filled by continuously reading from the fast stream. I took an inspiration from a response on the same thread <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html> by Elias Levy - The idea I was suggesting is not for operators to block an input. Rather, it is that they selectively choose from which input to process the next message from based on their timestamp, so long as there are buffered messages waiting to be processed. That is a best-effort alignment strategy. Seems to work relatively well in practice, at least within Kafka Streams. E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both its inputs. Instead, it could keep them separate and selectively consume from the one that had a buffer available, and if both have buffers available, from the buffer with the messages with a lower timestamp. And attempted a POC implementation of CoBackpressure whenever 2 streams are connected. This is committed in a branch in my own fork - https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 The approach is 1. Provide a new method setCoBackpressureThreshold on ConnectedStream 2. Pass the user-provided CoBackpressureThreshold through various classes till StreamTwoInputProcessorFactory. 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that pauses input1 or input2 if the diff between watermarks is greater than the threshold. In other words, it selectively chooses the next input which is lagging behind. Some key points 1. One benefit of this approach is that a user can configure CoBackpressureThreshold at any join and it is not a global config. 2. IntervalJoins internally use ConnectedStreams and therefore this will work for intervalJoins as well. 3. Other window joins do not use ConnectedStreams but use UnionedStreams. Will have to find a solution for that. 4. I believe MultipleInputStreams will also need similar functioality. 5. IMP: This approach does not solve the problem of having event-time skew within different partitions/shards of the same input source. It only solves for event time alignment of different sources. Looking forward to inputs on the same. If this seems like a feasible approach, I can take it forward and implement code with fixes for identified gaps and appropriate test cases. Thanks, Robin |
Hi Robin
Thank you for bringing up this discussion. AFAIK there are many same requirements.But it might lead to a deadlock if we depend on pausing one input of two to align the watermark. After the FLIP-27 Flink would introduce some new mechanism for aligning the watermark of different sources .Maybe @Becket could give some inputs or some plans for this. Best, Guowei On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]> wrote: > Hi all, > > The issue has been discussed before here - > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html > > Our use case requires event time join of two streams and we use > ConnectedStreams for the same. Within the CoProcessFunction, we buffer > records until watermark and perform the join and business logic based on > watermark. The issue is if one stream is slower than the other, the buffer > (a rocksdb state) is unnecessarily filled by continuously reading from the > fast stream. > > I took an inspiration from a response on the same thread > < > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html > > > by Elias Levy - > > The idea I was suggesting is not for operators to block an input. Rather, > it is that they selectively choose from which input to process the next > message from based on their timestamp, so long as there are buffered > messages waiting to be processed. That is a best-effort alignment > strategy. Seems to work relatively well in practice, at least within Kafka > Streams. > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both > its inputs. Instead, it could keep them separate and selectively consume > from the one that had a buffer available, and if both have buffers > available, from the buffer with the messages with a lower timestamp. > > And attempted a POC implementation of CoBackpressure whenever 2 streams are > connected. This is committed in a branch in my own fork - > > https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 > > The approach is > > 1. Provide a new method setCoBackpressureThreshold on ConnectedStream > 2. Pass the user-provided CoBackpressureThreshold through various > classes till StreamTwoInputProcessorFactory. > 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that > pauses input1 or input2 if the diff between watermarks is greater than > the > threshold. In other words, it selectively chooses the next input which > is > lagging behind. > > Some key points > > 1. One benefit of this approach is that a user can configure > CoBackpressureThreshold at any join and it is not a global config. > 2. IntervalJoins internally use ConnectedStreams and therefore this will > work for intervalJoins as well. > 3. Other window joins do not use ConnectedStreams but use > UnionedStreams. Will have to find a solution for that. > 4. I believe MultipleInputStreams will also need similar functioality. > 5. IMP: This approach does not solve the problem of having event-time > skew within different partitions/shards of the same input source. It > only > solves for event time alignment of different sources. > > Looking forward to inputs on the same. If this seems like a feasible > approach, I can take it forward and implement code with fixes for > identified gaps and appropriate test cases. > > Thanks, > Robin > |
Hi Robin,
Flink has a functionality to block reading from an input. It's not documented, as it's not fully working in Streaming. Take a look at the `org.apache.flink.streaming.api.operators.InputSelectable` class and how it's being used (you have to implement your own operator to use it). It has two limitations: 1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph, this can lead to deadlocks in the job. 2. When blocking reads from an input, you are blocking checkpoints from progressing, so currently there is a safety check that disallows using `InputSelectable` when checkpointing is enabled. Having said that, and as Guowei mentioned, this doesn't seem to be a correct solution for your problem. Throttling FLIP-27 sources based on watermarks progression is the correct approach here, that avoids above issues. Best, Piotrek śr., 24 mar 2021 o 08:23 Guowei Ma <[hidden email]> napisał(a): > Hi Robin > > Thank you for bringing up this discussion. AFAIK there are many same > requirements.But it might lead to a deadlock if we depend on pausing one > input of two to align the watermark. > After the FLIP-27 Flink would introduce some new mechanism for aligning the > watermark of different sources .Maybe @Becket could give some inputs or > some plans for this. > > Best, > Guowei > > > On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]> wrote: > > > Hi all, > > > > The issue has been discussed before here - > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html > > > > Our use case requires event time join of two streams and we use > > ConnectedStreams for the same. Within the CoProcessFunction, we buffer > > records until watermark and perform the join and business logic based on > > watermark. The issue is if one stream is slower than the other, the > buffer > > (a rocksdb state) is unnecessarily filled by continuously reading from > the > > fast stream. > > > > I took an inspiration from a response on the same thread > > < > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html > > > > > by Elias Levy - > > > > The idea I was suggesting is not for operators to block an input. > Rather, > > it is that they selectively choose from which input to process the next > > message from based on their timestamp, so long as there are buffered > > messages waiting to be processed. That is a best-effort alignment > > strategy. Seems to work relatively well in practice, at least within > Kafka > > Streams. > > > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both > > its inputs. Instead, it could keep them separate and selectively consume > > from the one that had a buffer available, and if both have buffers > > available, from the buffer with the messages with a lower timestamp. > > > > And attempted a POC implementation of CoBackpressure whenever 2 streams > are > > connected. This is committed in a branch in my own fork - > > > > > https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 > > > > The approach is > > > > 1. Provide a new method setCoBackpressureThreshold on ConnectedStream > > 2. Pass the user-provided CoBackpressureThreshold through various > > classes till StreamTwoInputProcessorFactory. > > 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that > > pauses input1 or input2 if the diff between watermarks is greater than > > the > > threshold. In other words, it selectively chooses the next input which > > is > > lagging behind. > > > > Some key points > > > > 1. One benefit of this approach is that a user can configure > > CoBackpressureThreshold at any join and it is not a global config. > > 2. IntervalJoins internally use ConnectedStreams and therefore this > will > > work for intervalJoins as well. > > 3. Other window joins do not use ConnectedStreams but use > > UnionedStreams. Will have to find a solution for that. > > 4. I believe MultipleInputStreams will also need similar functioality. > > 5. IMP: This approach does not solve the problem of having event-time > > skew within different partitions/shards of the same input source. It > > only > > solves for event time alignment of different sources. > > > > Looking forward to inputs on the same. If this seems like a feasible > > approach, I can take it forward and implement code with fixes for > > identified gaps and appropriate test cases. > > > > Thanks, > > Robin > > > |
Thanks Pioter and Guowei for the inputs. I understand that this can lead to
deadlocks and the right solution should be based on FLIP-27. Will look further into FLIP-27 for this. Thanks, Robin On Wed, Mar 24, 2021 at 2:21 PM Piotr Nowojski <[hidden email]> wrote: > Hi Robin, > > Flink has a functionality to block reading from an input. It's not > documented, as it's not fully working in Streaming. Take a look at the > `org.apache.flink.streaming.api.operators.InputSelectable` class and how > it's being used (you have to implement your own operator to use it). It has > two limitations: > > 1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph, > this can lead to deadlocks in the job. > 2. When blocking reads from an input, you are blocking checkpoints from > progressing, so currently there is a safety check that disallows using > `InputSelectable` when checkpointing is enabled. > > Having said that, and as Guowei mentioned, this doesn't seem to be a > correct solution for your problem. Throttling FLIP-27 sources based on > watermarks progression is the correct approach here, that avoids above > issues. > > Best, > Piotrek > > śr., 24 mar 2021 o 08:23 Guowei Ma <[hidden email]> napisał(a): > > > Hi Robin > > > > Thank you for bringing up this discussion. AFAIK there are many same > > requirements.But it might lead to a deadlock if we depend on pausing one > > input of two to align the watermark. > > After the FLIP-27 Flink would introduce some new mechanism for aligning > the > > watermark of different sources .Maybe @Becket could give some inputs or > > some plans for this. > > > > Best, > > Guowei > > > > > > On Wed, Mar 24, 2021 at 1:46 PM Robin KC <[hidden email]> > wrote: > > > > > Hi all, > > > > > > The issue has been discussed before here - > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html > > > > > > Our use case requires event time join of two streams and we use > > > ConnectedStreams for the same. Within the CoProcessFunction, we buffer > > > records until watermark and perform the join and business logic based > on > > > watermark. The issue is if one stream is slower than the other, the > > buffer > > > (a rocksdb state) is unnecessarily filled by continuously reading from > > the > > > fast stream. > > > > > > I took an inspiration from a response on the same thread > > > < > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html > > > > > > > by Elias Levy - > > > > > > The idea I was suggesting is not for operators to block an input. > > Rather, > > > it is that they selectively choose from which input to process the next > > > message from based on their timestamp, so long as there are buffered > > > messages waiting to be processed. That is a best-effort alignment > > > strategy. Seems to work relatively well in practice, at least within > > Kafka > > > Streams. > > > > > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for > both > > > its inputs. Instead, it could keep them separate and selectively > consume > > > from the one that had a buffer available, and if both have buffers > > > available, from the buffer with the messages with a lower timestamp. > > > > > > And attempted a POC implementation of CoBackpressure whenever 2 streams > > are > > > connected. This is committed in a branch in my own fork - > > > > > > > > > https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 > > > > > > The approach is > > > > > > 1. Provide a new method setCoBackpressureThreshold on > ConnectedStream > > > 2. Pass the user-provided CoBackpressureThreshold through various > > > classes till StreamTwoInputProcessorFactory. > > > 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory > that > > > pauses input1 or input2 if the diff between watermarks is greater > than > > > the > > > threshold. In other words, it selectively chooses the next input > which > > > is > > > lagging behind. > > > > > > Some key points > > > > > > 1. One benefit of this approach is that a user can configure > > > CoBackpressureThreshold at any join and it is not a global config. > > > 2. IntervalJoins internally use ConnectedStreams and therefore this > > will > > > work for intervalJoins as well. > > > 3. Other window joins do not use ConnectedStreams but use > > > UnionedStreams. Will have to find a solution for that. > > > 4. I believe MultipleInputStreams will also need similar > functioality. > > > 5. IMP: This approach does not solve the problem of having > event-time > > > skew within different partitions/shards of the same input source. It > > > only > > > solves for event time alignment of different sources. > > > > > > Looking forward to inputs on the same. If this seems like a feasible > > > approach, I can take it forward and implement code with fixes for > > > identified gaps and appropriate test cases. > > > > > > Thanks, > > > Robin > > > > > > |
Free forum by Nabble | Edit this page |