[DISCUSS] Definition of idle partitions

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Definition of idle partitions

Arvid Heise-4
Hi devs,

While discussing "Watermark propagation with Sink API" and during
"[FLINK-18934] Idle stream does not advance watermark in connected stream",
we noticed some drawbacks on how Flink defines idle partitions currently.

To recap, idleness was always considered as a means to achieve progress in
window operators with idle partition in the source at the risk of losing a
bit of correctness. In particular, records could be considered late, simply
because of that idleness timeout and not because they arrived out of order.
A potential reprocessing would not be causing these records to be
considered late and we may end up with a different (correct) result.

The drawbacks that we discovered are as follows:
- We currently only use idleness to exclude respective upstream tasks from
participating in watermark generation.
- However, the definition is bound to records. [1] In particular, while a
partition is idle, no records should be produced.
- That brings us into quite a few edge cases, where operators emit records,
while they are actually idling: Think of timers, asyncIO operators, window
operators based on timeouts, etc. that trigger on an operator ingesting an
idle partition.
- The proper solution would be to turn the operator active while emitting
and to return to being idle afterwards (but when?). However, this has some
unintended side-effects depending on when you switch back:
  - If you toggle stream status for each record, you get a huge overhead on
stream status records and quite a bit of processing in downstream operators
(that code path is not much optimized since switching is considered a rare
thing).
  - If you toggle after a certain time, you may get delays>idleness in the
downstream window operators.
  - You could turn back when you processed all pending mails, but if you
have a self-replicating mail that would be never. Self-enqueueing, low
timer would also produce a flood similar to the first case.

All in all, the situation is quite unsatisfying because idleness implies no
records. However, currently there is no need to have that implication:
since we only use it for watermarks, we can easily allow records to be
emitted (in fact that was the old behavior before FLINK-18934 in many
cases) and still get the intended behavior in respect to watermarks:
- A channel that is active is providing watermarks.
- An idle channel is not providing any watermarks but can deliver records.

Ultimately, that would mean that we are actually not talking idle/active
partitions anymore. We are talking more about whether a particular subtask
should influence downstream watermark calculation or not. Leading to the
following questions:
1. Do we want to change the definition as outlined?
2. Do you see any problem with emitting records on subtask without explicit
watermarks?
3. If we want to go this way, we may need to refine the names/definitions.
Any ideas?

I think idle partition should translate into something like
automatic/implicit/passive watermarks; active partition into
explicit/active watermarks. Then StreamStatus is more about WatermarkMode
(not really happy with this one).

Best,

Arvid

[1]
https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Piotr Nowojski-5
Hi Arvid,

Thanks for writing down this summary and proposal. I think this was the
foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
that idleness is intermittent, strictly a task local concept and as such
shouldn't be exposed in for example sinks. While me and Eron thought that
it's a concept strictly connected to watermarks.

1. I'm big +1 for changing the StreamStatus definition to stream "providing
watermark" and "not providing watermark". With respect to that I agree with
Dawid that record bound idleness *(if we would ever need to define/expose
it)* should be an intermittent concept, like for example the existing in
the Task/runtime input availability (StreamTaskInput#isAvailable).
3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
I also don't have any good ideas.
`WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?

Best,
Piotrek

wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):

> Hi devs,
>
> While discussing "Watermark propagation with Sink API" and during
> "[FLINK-18934] Idle stream does not advance watermark in connected stream",
> we noticed some drawbacks on how Flink defines idle partitions currently.
>
> To recap, idleness was always considered as a means to achieve progress in
> window operators with idle partition in the source at the risk of losing a
> bit of correctness. In particular, records could be considered late, simply
> because of that idleness timeout and not because they arrived out of order.
> A potential reprocessing would not be causing these records to be
> considered late and we may end up with a different (correct) result.
>
> The drawbacks that we discovered are as follows:
> - We currently only use idleness to exclude respective upstream tasks from
> participating in watermark generation.
> - However, the definition is bound to records. [1] In particular, while a
> partition is idle, no records should be produced.
> - That brings us into quite a few edge cases, where operators emit records,
> while they are actually idling: Think of timers, asyncIO operators, window
> operators based on timeouts, etc. that trigger on an operator ingesting an
> idle partition.
> - The proper solution would be to turn the operator active while emitting
> and to return to being idle afterwards (but when?). However, this has some
> unintended side-effects depending on when you switch back:
>   - If you toggle stream status for each record, you get a huge overhead on
> stream status records and quite a bit of processing in downstream operators
> (that code path is not much optimized since switching is considered a rare
> thing).
>   - If you toggle after a certain time, you may get delays>idleness in the
> downstream window operators.
>   - You could turn back when you processed all pending mails, but if you
> have a self-replicating mail that would be never. Self-enqueueing, low
> timer would also produce a flood similar to the first case.
>
> All in all, the situation is quite unsatisfying because idleness implies no
> records. However, currently there is no need to have that implication:
> since we only use it for watermarks, we can easily allow records to be
> emitted (in fact that was the old behavior before FLINK-18934 in many
> cases) and still get the intended behavior in respect to watermarks:
> - A channel that is active is providing watermarks.
> - An idle channel is not providing any watermarks but can deliver records.
>
> Ultimately, that would mean that we are actually not talking idle/active
> partitions anymore. We are talking more about whether a particular subtask
> should influence downstream watermark calculation or not. Leading to the
> following questions:
> 1. Do we want to change the definition as outlined?
> 2. Do you see any problem with emitting records on subtask without explicit
> watermarks?
> 3. If we want to go this way, we may need to refine the names/definitions.
> Any ideas?
>
> I think idle partition should translate into something like
> automatic/implicit/passive watermarks; active partition into
> explicit/active watermarks. Then StreamStatus is more about WatermarkMode
> (not really happy with this one).
>
> Best,
>
> Arvid
>
> [1]
>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Eron Wright-3
It seems to me that idleness was introduced to deal with a very specific
issue.  In the pipeline, watermarks are aggregated not on a per-split basis
but on a per-subtask basis.  This works well when each subtask has exactly
one split.  When a sub-task has multiple splits, various complications
occur involving the commingling of watermarks.  And when a sub-task has no
splits, the pipeline stalls altogether.  To deal with the latter problem,
idleness was introduced.  The sub-task simply declares itself to be idle to
be taken out of consideration for purposes of watermark aggregation.

If we're looking for a more general solution, I would suggest we discuss
how to track watermarks on a per-split basis.  Or, as Till mentioned
recently, an alternate solution may be to dynamically adjust the
parallelism of the task.

I don't agree with the notion that idleness involves a correctness
tradeoff.  The facility I described above has no impact on correctness.
Meanwhile, various watermark strategies rely on heuristics involving the
processing-time domain, and the term idleness seems to have found purchase
there too.  The connection among the concepts seems tenuous.

-E



On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <[hidden email]> wrote:

> Hi Arvid,
>
> Thanks for writing down this summary and proposal. I think this was the
> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> that idleness is intermittent, strictly a task local concept and as such
> shouldn't be exposed in for example sinks. While me and Eron thought that
> it's a concept strictly connected to watermarks.
>
> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
> watermark" and "not providing watermark". With respect to that I agree with
> Dawid that record bound idleness *(if we would ever need to define/expose
> it)* should be an intermittent concept, like for example the existing in
> the Task/runtime input availability (StreamTaskInput#isAvailable).
> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> I also don't have any good ideas.
> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>
> Best,
> Piotrek
>
> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
>
> > Hi devs,
> >
> > While discussing "Watermark propagation with Sink API" and during
> > "[FLINK-18934] Idle stream does not advance watermark in connected
> stream",
> > we noticed some drawbacks on how Flink defines idle partitions currently.
> >
> > To recap, idleness was always considered as a means to achieve progress
> in
> > window operators with idle partition in the source at the risk of losing
> a
> > bit of correctness. In particular, records could be considered late,
> simply
> > because of that idleness timeout and not because they arrived out of
> order.
> > A potential reprocessing would not be causing these records to be
> > considered late and we may end up with a different (correct) result.
> >
> > The drawbacks that we discovered are as follows:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation.
> > - However, the definition is bound to records. [1] In particular, while a
> > partition is idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc. that trigger on an operator ingesting
> an
> > idle partition.
> > - The proper solution would be to turn the operator active while emitting
> > and to return to being idle afterwards (but when?). However, this has
> some
> > unintended side-effects depending on when you switch back:
> >   - If you toggle stream status for each record, you get a huge overhead
> on
> > stream status records and quite a bit of processing in downstream
> operators
> > (that code path is not much optimized since switching is considered a
> rare
> > thing).
> >   - If you toggle after a certain time, you may get delays>idleness in
> the
> > downstream window operators.
> >   - You could turn back when you processed all pending mails, but if you
> > have a self-replicating mail that would be never. Self-enqueueing, low
> > timer would also produce a flood similar to the first case.
> >
> > All in all, the situation is quite unsatisfying because idleness implies
> no
> > records. However, currently there is no need to have that implication:
> > since we only use it for watermarks, we can easily allow records to be
> > emitted (in fact that was the old behavior before FLINK-18934 in many
> > cases) and still get the intended behavior in respect to watermarks:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> >
> > Ultimately, that would mean that we are actually not talking idle/active
> > partitions anymore. We are talking more about whether a particular
> subtask
> > should influence downstream watermark calculation or not. Leading to the
> > following questions:
> > 1. Do we want to change the definition as outlined?
> > 2. Do you see any problem with emitting records on subtask without
> explicit
> > watermarks?
> > 3. If we want to go this way, we may need to refine the
> names/definitions.
> > Any ideas?
> >
> > I think idle partition should translate into something like
> > automatic/implicit/passive watermarks; active partition into
> > explicit/active watermarks. Then StreamStatus is more about WatermarkMode
> > (not really happy with this one).
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Piotr Nowojski-4
Hi Eron,

Can you elaborate a bit more what do you mean? I don’t understand what do you mean by more general solution.

As of now, stream is marked idle by a source/watermark generator, which has an effect of temporarily ignoring this stream/partition from calculating min watermark in the downstream tasks. However stream is switching back to active when any record is emitted. This is what’s causing problems described by Arvid.

The core of our proposal is very simple. Keep everything as it is except stating that stream will be changed back to active only once a watermark is emitted again - not record. In other words disconnecting idleness from presence of records and connecting it only to presence or lack of watermarks and allowing to emit records while “stream status” is “idle”

Piotrek


> Wiadomość napisana przez Eron Wright <[hidden email]> w dniu 09.06.2021, o godz. 06:01:
>
> It seems to me that idleness was introduced to deal with a very specific
> issue.  In the pipeline, watermarks are aggregated not on a per-split basis
> but on a per-subtask basis.  This works well when each subtask has exactly
> one split.  When a sub-task has multiple splits, various complications
> occur involving the commingling of watermarks.  And when a sub-task has no
> splits, the pipeline stalls altogether.  To deal with the latter problem,
> idleness was introduced.  The sub-task simply declares itself to be idle to
> be taken out of consideration for purposes of watermark aggregation.
>
> If we're looking for a more general solution, I would suggest we discuss
> how to track watermarks on a per-split basis.  Or, as Till mentioned
> recently, an alternate solution may be to dynamically adjust the
> parallelism of the task.
>
> I don't agree with the notion that idleness involves a correctness
> tradeoff.  The facility I described above has no impact on correctness.
> Meanwhile, various watermark strategies rely on heuristics involving the
> processing-time domain, and the term idleness seems to have found purchase
> there too.  The connection among the concepts seems tenuous.
>
> -E
>
>
>
>> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <[hidden email]> wrote:
>>
>> Hi Arvid,
>>
>> Thanks for writing down this summary and proposal. I think this was the
>> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
>> that idleness is intermittent, strictly a task local concept and as such
>> shouldn't be exposed in for example sinks. While me and Eron thought that
>> it's a concept strictly connected to watermarks.
>>
>> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
>> watermark" and "not providing watermark". With respect to that I agree with
>> Dawid that record bound idleness *(if we would ever need to define/expose
>> it)* should be an intermittent concept, like for example the existing in
>> the Task/runtime input availability (StreamTaskInput#isAvailable).
>> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
>> I also don't have any good ideas.
>> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>>
>> Best,
>> Piotrek
>>
>> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
>>
>>> Hi devs,
>>>
>>> While discussing "Watermark propagation with Sink API" and during
>>> "[FLINK-18934] Idle stream does not advance watermark in connected
>> stream",
>>> we noticed some drawbacks on how Flink defines idle partitions currently.
>>>
>>> To recap, idleness was always considered as a means to achieve progress
>> in
>>> window operators with idle partition in the source at the risk of losing
>> a
>>> bit of correctness. In particular, records could be considered late,
>> simply
>>> because of that idleness timeout and not because they arrived out of
>> order.
>>> A potential reprocessing would not be causing these records to be
>>> considered late and we may end up with a different (correct) result.
>>>
>>> The drawbacks that we discovered are as follows:
>>> - We currently only use idleness to exclude respective upstream tasks
>> from
>>> participating in watermark generation.
>>> - However, the definition is bound to records. [1] In particular, while a
>>> partition is idle, no records should be produced.
>>> - That brings us into quite a few edge cases, where operators emit
>> records,
>>> while they are actually idling: Think of timers, asyncIO operators,
>> window
>>> operators based on timeouts, etc. that trigger on an operator ingesting
>> an
>>> idle partition.
>>> - The proper solution would be to turn the operator active while emitting
>>> and to return to being idle afterwards (but when?). However, this has
>> some
>>> unintended side-effects depending on when you switch back:
>>>  - If you toggle stream status for each record, you get a huge overhead
>> on
>>> stream status records and quite a bit of processing in downstream
>> operators
>>> (that code path is not much optimized since switching is considered a
>> rare
>>> thing).
>>>  - If you toggle after a certain time, you may get delays>idleness in
>> the
>>> downstream window operators.
>>>  - You could turn back when you processed all pending mails, but if you
>>> have a self-replicating mail that would be never. Self-enqueueing, low
>>> timer would also produce a flood similar to the first case.
>>>
>>> All in all, the situation is quite unsatisfying because idleness implies
>> no
>>> records. However, currently there is no need to have that implication:
>>> since we only use it for watermarks, we can easily allow records to be
>>> emitted (in fact that was the old behavior before FLINK-18934 in many
>>> cases) and still get the intended behavior in respect to watermarks:
>>> - A channel that is active is providing watermarks.
>>> - An idle channel is not providing any watermarks but can deliver
>> records.
>>>
>>> Ultimately, that would mean that we are actually not talking idle/active
>>> partitions anymore. We are talking more about whether a particular
>> subtask
>>> should influence downstream watermark calculation or not. Leading to the
>>> following questions:
>>> 1. Do we want to change the definition as outlined?
>>> 2. Do you see any problem with emitting records on subtask without
>> explicit
>>> watermarks?
>>> 3. If we want to go this way, we may need to refine the
>> names/definitions.
>>> Any ideas?
>>>
>>> I think idle partition should translate into something like
>>> automatic/implicit/passive watermarks; active partition into
>>> explicit/active watermarks. Then StreamStatus is more about WatermarkMode
>>> (not really happy with this one).
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> [1]
>>>
>>>
>> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Arvid Heise-4
Hi Eron,

again to recap from the other thread:
- You are right that idleness is correct with static assignment and fully
active partitions. In this case, the source defines idleness. (case A)
- For the more pressing use cases of idle, assigned partitions, the user
defines an idleness threshold, and it becomes potentially incorrect, when
the partition becomes active again. (case B)
- Same holds for dynamic assignment of splits. If a source without a split
gets a split assigned dynamically, there is a realistic chance that the
watermark advanced past the first record of the newly assigned split. (case
C)
You can certainly insist that only the first case is valid (as it's
correct) but we know that users use it in other ways and that was also the
intent of the devs.

Now the question could be if it makes sense to distinguish these cases.
Would you treat the idleness information differently (especially in the
sink/source that motivated FLIP-167) if you knew that the idleness is
guaranteed correct?
We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
(case B).

However, that would still leave case C, which probably would need to be
solved completely differently. I could imagine that a source with dynamic
assignments should never have IDLE subtasks and rather manage the idleness
itself. For example, it could emit a watermark per second/minute that is
directly fetched from the source system. I'm just not sure if the current
WatermarkAssigner interface suffices in that regard...


On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <[hidden email]>
wrote:

> Hi Eron,
>
> Can you elaborate a bit more what do you mean? I don’t understand what do
> you mean by more general solution.
>
> As of now, stream is marked idle by a source/watermark generator, which
> has an effect of temporarily ignoring this stream/partition from
> calculating min watermark in the downstream tasks. However stream is
> switching back to active when any record is emitted. This is what’s causing
> problems described by Arvid.
>
> The core of our proposal is very simple. Keep everything as it is except
> stating that stream will be changed back to active only once a watermark is
> emitted again - not record. In other words disconnecting idleness from
> presence of records and connecting it only to presence or lack of
> watermarks and allowing to emit records while “stream status” is “idle”
>
> Piotrek
>
>
> > Wiadomość napisana przez Eron Wright <[hidden email]>
> w dniu 09.06.2021, o godz. 06:01:
> >
> > It seems to me that idleness was introduced to deal with a very specific
> > issue.  In the pipeline, watermarks are aggregated not on a per-split
> basis
> > but on a per-subtask basis.  This works well when each subtask has
> exactly
> > one split.  When a sub-task has multiple splits, various complications
> > occur involving the commingling of watermarks.  And when a sub-task has
> no
> > splits, the pipeline stalls altogether.  To deal with the latter problem,
> > idleness was introduced.  The sub-task simply declares itself to be idle
> to
> > be taken out of consideration for purposes of watermark aggregation.
> >
> > If we're looking for a more general solution, I would suggest we discuss
> > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > recently, an alternate solution may be to dynamically adjust the
> > parallelism of the task.
> >
> > I don't agree with the notion that idleness involves a correctness
> > tradeoff.  The facility I described above has no impact on correctness.
> > Meanwhile, various watermark strategies rely on heuristics involving the
> > processing-time domain, and the term idleness seems to have found
> purchase
> > there too.  The connection among the concepts seems tenuous.
> >
> > -E
> >
> >
> >
> >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <[hidden email]>
> wrote:
> >>
> >> Hi Arvid,
> >>
> >> Thanks for writing down this summary and proposal. I think this was the
> >> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> >> that idleness is intermittent, strictly a task local concept and as such
> >> shouldn't be exposed in for example sinks. While me and Eron thought
> that
> >> it's a concept strictly connected to watermarks.
> >>
> >> 1. I'm big +1 for changing the StreamStatus definition to stream
> "providing
> >> watermark" and "not providing watermark". With respect to that I agree
> with
> >> Dawid that record bound idleness *(if we would ever need to
> define/expose
> >> it)* should be an intermittent concept, like for example the existing in
> >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> >> I also don't have any good ideas.
> >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
> >>
> >>> Hi devs,
> >>>
> >>> While discussing "Watermark propagation with Sink API" and during
> >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> >> stream",
> >>> we noticed some drawbacks on how Flink defines idle partitions
> currently.
> >>>
> >>> To recap, idleness was always considered as a means to achieve progress
> >> in
> >>> window operators with idle partition in the source at the risk of
> losing
> >> a
> >>> bit of correctness. In particular, records could be considered late,
> >> simply
> >>> because of that idleness timeout and not because they arrived out of
> >> order.
> >>> A potential reprocessing would not be causing these records to be
> >>> considered late and we may end up with a different (correct) result.
> >>>
> >>> The drawbacks that we discovered are as follows:
> >>> - We currently only use idleness to exclude respective upstream tasks
> >> from
> >>> participating in watermark generation.
> >>> - However, the definition is bound to records. [1] In particular,
> while a
> >>> partition is idle, no records should be produced.
> >>> - That brings us into quite a few edge cases, where operators emit
> >> records,
> >>> while they are actually idling: Think of timers, asyncIO operators,
> >> window
> >>> operators based on timeouts, etc. that trigger on an operator ingesting
> >> an
> >>> idle partition.
> >>> - The proper solution would be to turn the operator active while
> emitting
> >>> and to return to being idle afterwards (but when?). However, this has
> >> some
> >>> unintended side-effects depending on when you switch back:
> >>>  - If you toggle stream status for each record, you get a huge overhead
> >> on
> >>> stream status records and quite a bit of processing in downstream
> >> operators
> >>> (that code path is not much optimized since switching is considered a
> >> rare
> >>> thing).
> >>>  - If you toggle after a certain time, you may get delays>idleness in
> >> the
> >>> downstream window operators.
> >>>  - You could turn back when you processed all pending mails, but if you
> >>> have a self-replicating mail that would be never. Self-enqueueing, low
> >>> timer would also produce a flood similar to the first case.
> >>>
> >>> All in all, the situation is quite unsatisfying because idleness
> implies
> >> no
> >>> records. However, currently there is no need to have that implication:
> >>> since we only use it for watermarks, we can easily allow records to be
> >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> >>> cases) and still get the intended behavior in respect to watermarks:
> >>> - A channel that is active is providing watermarks.
> >>> - An idle channel is not providing any watermarks but can deliver
> >> records.
> >>>
> >>> Ultimately, that would mean that we are actually not talking
> idle/active
> >>> partitions anymore. We are talking more about whether a particular
> >> subtask
> >>> should influence downstream watermark calculation or not. Leading to
> the
> >>> following questions:
> >>> 1. Do we want to change the definition as outlined?
> >>> 2. Do you see any problem with emitting records on subtask without
> >> explicit
> >>> watermarks?
> >>> 3. If we want to go this way, we may need to refine the
> >> names/definitions.
> >>> Any ideas?
> >>>
> >>> I think idle partition should translate into something like
> >>> automatic/implicit/passive watermarks; active partition into
> >>> explicit/active watermarks. Then StreamStatus is more about
> WatermarkMode
> >>> (not really happy with this one).
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Tzu-Li (Gordon) Tai
Hi everyone,

Sorry for chiming in late here.

Regarding the topic of changing the definition of StreamStatus and changing
the name as well:
After digging into some of the roots of this implementation [1], initially
the StreamStatus was actually defined to mark "watermark idleness", and not
"record idleness" (in fact, the alternative name "WatermarkStatus" was
considered at the time).

The concern at the time causing us to alter the definition to be "record
idleness" in the final implementation was due to the existence of periodic
timestamp / watermark generators within the pipeline. Those would continue
to generate non-increasing watermarks in the absence of any input records
from upstream. In this scenario, downstream operators would not be able to
consider that channel as idle and therefore watermark progress is locked.
We could consider a timeout-based approach on those specific operators to
toggle watermark idleness if the values remain constant for a period of
time, but then again, this is very ill-defined and most likely wrong.

I have not followed the newest changes to the watermark generator operators
and am not sure if this issue is still relevant.
Otherwise, I don't see other problems with changing the definition here.

Thanks,
Gordon

On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <[hidden email]> wrote:

> Hi Eron,
>
> again to recap from the other thread:
> - You are right that idleness is correct with static assignment and fully
> active partitions. In this case, the source defines idleness. (case A)
> - For the more pressing use cases of idle, assigned partitions, the user
> defines an idleness threshold, and it becomes potentially incorrect, when
> the partition becomes active again. (case B)
> - Same holds for dynamic assignment of splits. If a source without a split
> gets a split assigned dynamically, there is a realistic chance that the
> watermark advanced past the first record of the newly assigned split. (case
> C)
> You can certainly insist that only the first case is valid (as it's
> correct) but we know that users use it in other ways and that was also the
> intent of the devs.
>
> Now the question could be if it makes sense to distinguish these cases.
> Would you treat the idleness information differently (especially in the
> sink/source that motivated FLIP-167) if you knew that the idleness is
> guaranteed correct?
> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> (case B).
>
> However, that would still leave case C, which probably would need to be
> solved completely differently. I could imagine that a source with dynamic
> assignments should never have IDLE subtasks and rather manage the idleness
> itself. For example, it could emit a watermark per second/minute that is
> directly fetched from the source system. I'm just not sure if the current
> WatermarkAssigner interface suffices in that regard...
>
>
> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <[hidden email]>
> wrote:
>
> > Hi Eron,
> >
> > Can you elaborate a bit more what do you mean? I don’t understand what do
> > you mean by more general solution.
> >
> > As of now, stream is marked idle by a source/watermark generator, which
> > has an effect of temporarily ignoring this stream/partition from
> > calculating min watermark in the downstream tasks. However stream is
> > switching back to active when any record is emitted. This is what’s
> causing
> > problems described by Arvid.
> >
> > The core of our proposal is very simple. Keep everything as it is except
> > stating that stream will be changed back to active only once a watermark
> is
> > emitted again - not record. In other words disconnecting idleness from
> > presence of records and connecting it only to presence or lack of
> > watermarks and allowing to emit records while “stream status” is “idle”
> >
> > Piotrek
> >
> >
> > > Wiadomość napisana przez Eron Wright <[hidden email]>
> > w dniu 09.06.2021, o godz. 06:01:
> > >
> > > It seems to me that idleness was introduced to deal with a very
> specific
> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
> > basis
> > > but on a per-subtask basis.  This works well when each subtask has
> > exactly
> > > one split.  When a sub-task has multiple splits, various complications
> > > occur involving the commingling of watermarks.  And when a sub-task has
> > no
> > > splits, the pipeline stalls altogether.  To deal with the latter
> problem,
> > > idleness was introduced.  The sub-task simply declares itself to be
> idle
> > to
> > > be taken out of consideration for purposes of watermark aggregation.
> > >
> > > If we're looking for a more general solution, I would suggest we
> discuss
> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > > recently, an alternate solution may be to dynamically adjust the
> > > parallelism of the task.
> > >
> > > I don't agree with the notion that idleness involves a correctness
> > > tradeoff.  The facility I described above has no impact on correctness.
> > > Meanwhile, various watermark strategies rely on heuristics involving
> the
> > > processing-time domain, and the term idleness seems to have found
> > purchase
> > > there too.  The connection among the concepts seems tenuous.
> > >
> > > -E
> > >
> > >
> > >
> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <[hidden email]>
> > wrote:
> > >>
> > >> Hi Arvid,
> > >>
> > >> Thanks for writing down this summary and proposal. I think this was
> the
> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> arguing
> > >> that idleness is intermittent, strictly a task local concept and as
> such
> > >> shouldn't be exposed in for example sinks. While me and Eron thought
> > that
> > >> it's a concept strictly connected to watermarks.
> > >>
> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> > "providing
> > >> watermark" and "not providing watermark". With respect to that I agree
> > with
> > >> Dawid that record bound idleness *(if we would ever need to
> > define/expose
> > >> it)* should be an intermittent concept, like for example the existing
> in
> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> > >> I also don't have any good ideas.
> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
> > >>
> > >>> Hi devs,
> > >>>
> > >>> While discussing "Watermark propagation with Sink API" and during
> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> > >> stream",
> > >>> we noticed some drawbacks on how Flink defines idle partitions
> > currently.
> > >>>
> > >>> To recap, idleness was always considered as a means to achieve
> progress
> > >> in
> > >>> window operators with idle partition in the source at the risk of
> > losing
> > >> a
> > >>> bit of correctness. In particular, records could be considered late,
> > >> simply
> > >>> because of that idleness timeout and not because they arrived out of
> > >> order.
> > >>> A potential reprocessing would not be causing these records to be
> > >>> considered late and we may end up with a different (correct) result.
> > >>>
> > >>> The drawbacks that we discovered are as follows:
> > >>> - We currently only use idleness to exclude respective upstream tasks
> > >> from
> > >>> participating in watermark generation.
> > >>> - However, the definition is bound to records. [1] In particular,
> > while a
> > >>> partition is idle, no records should be produced.
> > >>> - That brings us into quite a few edge cases, where operators emit
> > >> records,
> > >>> while they are actually idling: Think of timers, asyncIO operators,
> > >> window
> > >>> operators based on timeouts, etc. that trigger on an operator
> ingesting
> > >> an
> > >>> idle partition.
> > >>> - The proper solution would be to turn the operator active while
> > emitting
> > >>> and to return to being idle afterwards (but when?). However, this has
> > >> some
> > >>> unintended side-effects depending on when you switch back:
> > >>>  - If you toggle stream status for each record, you get a huge
> overhead
> > >> on
> > >>> stream status records and quite a bit of processing in downstream
> > >> operators
> > >>> (that code path is not much optimized since switching is considered a
> > >> rare
> > >>> thing).
> > >>>  - If you toggle after a certain time, you may get delays>idleness in
> > >> the
> > >>> downstream window operators.
> > >>>  - You could turn back when you processed all pending mails, but if
> you
> > >>> have a self-replicating mail that would be never. Self-enqueueing,
> low
> > >>> timer would also produce a flood similar to the first case.
> > >>>
> > >>> All in all, the situation is quite unsatisfying because idleness
> > implies
> > >> no
> > >>> records. However, currently there is no need to have that
> implication:
> > >>> since we only use it for watermarks, we can easily allow records to
> be
> > >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> > >>> cases) and still get the intended behavior in respect to watermarks:
> > >>> - A channel that is active is providing watermarks.
> > >>> - An idle channel is not providing any watermarks but can deliver
> > >> records.
> > >>>
> > >>> Ultimately, that would mean that we are actually not talking
> > idle/active
> > >>> partitions anymore. We are talking more about whether a particular
> > >> subtask
> > >>> should influence downstream watermark calculation or not. Leading to
> > the
> > >>> following questions:
> > >>> 1. Do we want to change the definition as outlined?
> > >>> 2. Do you see any problem with emitting records on subtask without
> > >> explicit
> > >>> watermarks?
> > >>> 3. If we want to go this way, we may need to refine the
> > >> names/definitions.
> > >>> Any ideas?
> > >>>
> > >>> I think idle partition should translate into something like
> > >>> automatic/implicit/passive watermarks; active partition into
> > >>> explicit/active watermarks. Then StreamStatus is more about
> > WatermarkMode
> > >>> (not really happy with this one).
> > >>>
> > >>> Best,
> > >>>
> > >>> Arvid
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> > >>>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Tzu-Li (Gordon) Tai
Forgot to provide the link to the [1] reference:

[1] https://issues.apache.org/jira/browse/FLINK-5017

On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi everyone,
>
> Sorry for chiming in late here.
>
> Regarding the topic of changing the definition of StreamStatus and
> changing the name as well:
> After digging into some of the roots of this implementation [1], initially
> the StreamStatus was actually defined to mark "watermark idleness", and not
> "record idleness" (in fact, the alternative name "WatermarkStatus" was
> considered at the time).
>
> The concern at the time causing us to alter the definition to be "record
> idleness" in the final implementation was due to the existence of periodic
> timestamp / watermark generators within the pipeline. Those would continue
> to generate non-increasing watermarks in the absence of any input records
> from upstream. In this scenario, downstream operators would not be able to
> consider that channel as idle and therefore watermark progress is locked.
> We could consider a timeout-based approach on those specific operators to
> toggle watermark idleness if the values remain constant for a period of
> time, but then again, this is very ill-defined and most likely wrong.
>
> I have not followed the newest changes to the watermark generator
> operators and am not sure if this issue is still relevant.
> Otherwise, I don't see other problems with changing the definition here.
>
> Thanks,
> Gordon
>
> On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <[hidden email]> wrote:
>
>> Hi Eron,
>>
>> again to recap from the other thread:
>> - You are right that idleness is correct with static assignment and fully
>> active partitions. In this case, the source defines idleness. (case A)
>> - For the more pressing use cases of idle, assigned partitions, the user
>> defines an idleness threshold, and it becomes potentially incorrect, when
>> the partition becomes active again. (case B)
>> - Same holds for dynamic assignment of splits. If a source without a split
>> gets a split assigned dynamically, there is a realistic chance that the
>> watermark advanced past the first record of the newly assigned split.
>> (case
>> C)
>> You can certainly insist that only the first case is valid (as it's
>> correct) but we know that users use it in other ways and that was also the
>> intent of the devs.
>>
>> Now the question could be if it makes sense to distinguish these cases.
>> Would you treat the idleness information differently (especially in the
>> sink/source that motivated FLIP-167) if you knew that the idleness is
>> guaranteed correct?
>> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
>> (case B).
>>
>> However, that would still leave case C, which probably would need to be
>> solved completely differently. I could imagine that a source with dynamic
>> assignments should never have IDLE subtasks and rather manage the idleness
>> itself. For example, it could emit a watermark per second/minute that is
>> directly fetched from the source system. I'm just not sure if the current
>> WatermarkAssigner interface suffices in that regard...
>>
>>
>> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <[hidden email]>
>> wrote:
>>
>> > Hi Eron,
>> >
>> > Can you elaborate a bit more what do you mean? I don’t understand what
>> do
>> > you mean by more general solution.
>> >
>> > As of now, stream is marked idle by a source/watermark generator, which
>> > has an effect of temporarily ignoring this stream/partition from
>> > calculating min watermark in the downstream tasks. However stream is
>> > switching back to active when any record is emitted. This is what’s
>> causing
>> > problems described by Arvid.
>> >
>> > The core of our proposal is very simple. Keep everything as it is except
>> > stating that stream will be changed back to active only once a
>> watermark is
>> > emitted again - not record. In other words disconnecting idleness from
>> > presence of records and connecting it only to presence or lack of
>> > watermarks and allowing to emit records while “stream status” is “idle”
>> >
>> > Piotrek
>> >
>> >
>> > > Wiadomość napisana przez Eron Wright <[hidden email]
>> .invalid>
>> > w dniu 09.06.2021, o godz. 06:01:
>> > >
>> > > It seems to me that idleness was introduced to deal with a very
>> specific
>> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
>> > basis
>> > > but on a per-subtask basis.  This works well when each subtask has
>> > exactly
>> > > one split.  When a sub-task has multiple splits, various complications
>> > > occur involving the commingling of watermarks.  And when a sub-task
>> has
>> > no
>> > > splits, the pipeline stalls altogether.  To deal with the latter
>> problem,
>> > > idleness was introduced.  The sub-task simply declares itself to be
>> idle
>> > to
>> > > be taken out of consideration for purposes of watermark aggregation.
>> > >
>> > > If we're looking for a more general solution, I would suggest we
>> discuss
>> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
>> > > recently, an alternate solution may be to dynamically adjust the
>> > > parallelism of the task.
>> > >
>> > > I don't agree with the notion that idleness involves a correctness
>> > > tradeoff.  The facility I described above has no impact on
>> correctness.
>> > > Meanwhile, various watermark strategies rely on heuristics involving
>> the
>> > > processing-time domain, and the term idleness seems to have found
>> > purchase
>> > > there too.  The connection among the concepts seems tenuous.
>> > >
>> > > -E
>> > >
>> > >
>> > >
>> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <[hidden email]>
>> > wrote:
>> > >>
>> > >> Hi Arvid,
>> > >>
>> > >> Thanks for writing down this summary and proposal. I think this was
>> the
>> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
>> arguing
>> > >> that idleness is intermittent, strictly a task local concept and as
>> such
>> > >> shouldn't be exposed in for example sinks. While me and Eron thought
>> > that
>> > >> it's a concept strictly connected to watermarks.
>> > >>
>> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
>> > "providing
>> > >> watermark" and "not providing watermark". With respect to that I
>> agree
>> > with
>> > >> Dawid that record bound idleness *(if we would ever need to
>> > define/expose
>> > >> it)* should be an intermittent concept, like for example the
>> existing in
>> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
>> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
>> > >> I also don't have any good ideas.
>> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>> > >>
>> > >> Best,
>> > >> Piotrek
>> > >>
>> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
>> > >>
>> > >>> Hi devs,
>> > >>>
>> > >>> While discussing "Watermark propagation with Sink API" and during
>> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
>> > >> stream",
>> > >>> we noticed some drawbacks on how Flink defines idle partitions
>> > currently.
>> > >>>
>> > >>> To recap, idleness was always considered as a means to achieve
>> progress
>> > >> in
>> > >>> window operators with idle partition in the source at the risk of
>> > losing
>> > >> a
>> > >>> bit of correctness. In particular, records could be considered late,
>> > >> simply
>> > >>> because of that idleness timeout and not because they arrived out of
>> > >> order.
>> > >>> A potential reprocessing would not be causing these records to be
>> > >>> considered late and we may end up with a different (correct) result.
>> > >>>
>> > >>> The drawbacks that we discovered are as follows:
>> > >>> - We currently only use idleness to exclude respective upstream
>> tasks
>> > >> from
>> > >>> participating in watermark generation.
>> > >>> - However, the definition is bound to records. [1] In particular,
>> > while a
>> > >>> partition is idle, no records should be produced.
>> > >>> - That brings us into quite a few edge cases, where operators emit
>> > >> records,
>> > >>> while they are actually idling: Think of timers, asyncIO operators,
>> > >> window
>> > >>> operators based on timeouts, etc. that trigger on an operator
>> ingesting
>> > >> an
>> > >>> idle partition.
>> > >>> - The proper solution would be to turn the operator active while
>> > emitting
>> > >>> and to return to being idle afterwards (but when?). However, this
>> has
>> > >> some
>> > >>> unintended side-effects depending on when you switch back:
>> > >>>  - If you toggle stream status for each record, you get a huge
>> overhead
>> > >> on
>> > >>> stream status records and quite a bit of processing in downstream
>> > >> operators
>> > >>> (that code path is not much optimized since switching is considered
>> a
>> > >> rare
>> > >>> thing).
>> > >>>  - If you toggle after a certain time, you may get delays>idleness
>> in
>> > >> the
>> > >>> downstream window operators.
>> > >>>  - You could turn back when you processed all pending mails, but if
>> you
>> > >>> have a self-replicating mail that would be never. Self-enqueueing,
>> low
>> > >>> timer would also produce a flood similar to the first case.
>> > >>>
>> > >>> All in all, the situation is quite unsatisfying because idleness
>> > implies
>> > >> no
>> > >>> records. However, currently there is no need to have that
>> implication:
>> > >>> since we only use it for watermarks, we can easily allow records to
>> be
>> > >>> emitted (in fact that was the old behavior before FLINK-18934 in
>> many
>> > >>> cases) and still get the intended behavior in respect to watermarks:
>> > >>> - A channel that is active is providing watermarks.
>> > >>> - An idle channel is not providing any watermarks but can deliver
>> > >> records.
>> > >>>
>> > >>> Ultimately, that would mean that we are actually not talking
>> > idle/active
>> > >>> partitions anymore. We are talking more about whether a particular
>> > >> subtask
>> > >>> should influence downstream watermark calculation or not. Leading to
>> > the
>> > >>> following questions:
>> > >>> 1. Do we want to change the definition as outlined?
>> > >>> 2. Do you see any problem with emitting records on subtask without
>> > >> explicit
>> > >>> watermarks?
>> > >>> 3. If we want to go this way, we may need to refine the
>> > >> names/definitions.
>> > >>> Any ideas?
>> > >>>
>> > >>> I think idle partition should translate into something like
>> > >>> automatic/implicit/passive watermarks; active partition into
>> > >>> explicit/active watermarks. Then StreamStatus is more about
>> > WatermarkMode
>> > >>> (not really happy with this one).
>> > >>>
>> > >>> Best,
>> > >>>
>> > >>> Arvid
>> > >>>
>> > >>> [1]
>> > >>>
>> > >>>
>> > >>
>> >
>> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>> > >>>
>> > >>
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Till Rohrmann
Thanks for providing these details Gordon. I have to admit that I do not
fully follow the reasoning why periodic watermark generators forced us to
define idleness for records. Is it because the idleness was generated based
on the non-availability of more data in the sources and not in the
watermark generators which are executed after the records have been read
from the external system? So was the problem where the stream status was
decided in the end?

If there is a periodic watermark generator somewhere in the pipeline that
periodically generates watermarks, then we don't have to mark its output
channels as watermark idle because watermarks are being sent. Hence, given
that the watermark generation logic makes sense, the overall job should be
able to make progress. If the watermark generator is informed about its
input channel status, it could even decide whether to propagate the
watermark idleness and stop generating watermarks or not. Of course, this
leaves room for people shooting themselves into their feet.

Cheers,
Till

On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Forgot to provide the link to the [1] reference:
>
> [1] https://issues.apache.org/jira/browse/FLINK-5017
>
> On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <[hidden email]>
> wrote:
>
> > Hi everyone,
> >
> > Sorry for chiming in late here.
> >
> > Regarding the topic of changing the definition of StreamStatus and
> > changing the name as well:
> > After digging into some of the roots of this implementation [1],
> initially
> > the StreamStatus was actually defined to mark "watermark idleness", and
> not
> > "record idleness" (in fact, the alternative name "WatermarkStatus" was
> > considered at the time).
> >
> > The concern at the time causing us to alter the definition to be "record
> > idleness" in the final implementation was due to the existence of
> periodic
> > timestamp / watermark generators within the pipeline. Those would
> continue
> > to generate non-increasing watermarks in the absence of any input records
> > from upstream. In this scenario, downstream operators would not be able
> to
> > consider that channel as idle and therefore watermark progress is locked.
> > We could consider a timeout-based approach on those specific operators to
> > toggle watermark idleness if the values remain constant for a period of
> > time, but then again, this is very ill-defined and most likely wrong.
> >
> > I have not followed the newest changes to the watermark generator
> > operators and am not sure if this issue is still relevant.
> > Otherwise, I don't see other problems with changing the definition here.
> >
> > Thanks,
> > Gordon
> >
> > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <[hidden email]> wrote:
> >
> >> Hi Eron,
> >>
> >> again to recap from the other thread:
> >> - You are right that idleness is correct with static assignment and
> fully
> >> active partitions. In this case, the source defines idleness. (case A)
> >> - For the more pressing use cases of idle, assigned partitions, the user
> >> defines an idleness threshold, and it becomes potentially incorrect,
> when
> >> the partition becomes active again. (case B)
> >> - Same holds for dynamic assignment of splits. If a source without a
> split
> >> gets a split assigned dynamically, there is a realistic chance that the
> >> watermark advanced past the first record of the newly assigned split.
> >> (case
> >> C)
> >> You can certainly insist that only the first case is valid (as it's
> >> correct) but we know that users use it in other ways and that was also
> the
> >> intent of the devs.
> >>
> >> Now the question could be if it makes sense to distinguish these cases.
> >> Would you treat the idleness information differently (especially in the
> >> sink/source that motivated FLIP-167) if you knew that the idleness is
> >> guaranteed correct?
> >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> >> (case B).
> >>
> >> However, that would still leave case C, which probably would need to be
> >> solved completely differently. I could imagine that a source with
> dynamic
> >> assignments should never have IDLE subtasks and rather manage the
> idleness
> >> itself. For example, it could emit a watermark per second/minute that is
> >> directly fetched from the source system. I'm just not sure if the
> current
> >> WatermarkAssigner interface suffices in that regard...
> >>
> >>
> >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <[hidden email]
> >
> >> wrote:
> >>
> >> > Hi Eron,
> >> >
> >> > Can you elaborate a bit more what do you mean? I don’t understand what
> >> do
> >> > you mean by more general solution.
> >> >
> >> > As of now, stream is marked idle by a source/watermark generator,
> which
> >> > has an effect of temporarily ignoring this stream/partition from
> >> > calculating min watermark in the downstream tasks. However stream is
> >> > switching back to active when any record is emitted. This is what’s
> >> causing
> >> > problems described by Arvid.
> >> >
> >> > The core of our proposal is very simple. Keep everything as it is
> except
> >> > stating that stream will be changed back to active only once a
> >> watermark is
> >> > emitted again - not record. In other words disconnecting idleness from
> >> > presence of records and connecting it only to presence or lack of
> >> > watermarks and allowing to emit records while “stream status” is
> “idle”
> >> >
> >> > Piotrek
> >> >
> >> >
> >> > > Wiadomość napisana przez Eron Wright <[hidden email]
> >> .invalid>
> >> > w dniu 09.06.2021, o godz. 06:01:
> >> > >
> >> > > It seems to me that idleness was introduced to deal with a very
> >> specific
> >> > > issue.  In the pipeline, watermarks are aggregated not on a
> per-split
> >> > basis
> >> > > but on a per-subtask basis.  This works well when each subtask has
> >> > exactly
> >> > > one split.  When a sub-task has multiple splits, various
> complications
> >> > > occur involving the commingling of watermarks.  And when a sub-task
> >> has
> >> > no
> >> > > splits, the pipeline stalls altogether.  To deal with the latter
> >> problem,
> >> > > idleness was introduced.  The sub-task simply declares itself to be
> >> idle
> >> > to
> >> > > be taken out of consideration for purposes of watermark aggregation.
> >> > >
> >> > > If we're looking for a more general solution, I would suggest we
> >> discuss
> >> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
> >> > > recently, an alternate solution may be to dynamically adjust the
> >> > > parallelism of the task.
> >> > >
> >> > > I don't agree with the notion that idleness involves a correctness
> >> > > tradeoff.  The facility I described above has no impact on
> >> correctness.
> >> > > Meanwhile, various watermark strategies rely on heuristics involving
> >> the
> >> > > processing-time domain, and the term idleness seems to have found
> >> > purchase
> >> > > there too.  The connection among the concepts seems tenuous.
> >> > >
> >> > > -E
> >> > >
> >> > >
> >> > >
> >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
> [hidden email]>
> >> > wrote:
> >> > >>
> >> > >> Hi Arvid,
> >> > >>
> >> > >> Thanks for writing down this summary and proposal. I think this was
> >> the
> >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> >> arguing
> >> > >> that idleness is intermittent, strictly a task local concept and as
> >> such
> >> > >> shouldn't be exposed in for example sinks. While me and Eron
> thought
> >> > that
> >> > >> it's a concept strictly connected to watermarks.
> >> > >>
> >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> >> > "providing
> >> > >> watermark" and "not providing watermark". With respect to that I
> >> agree
> >> > with
> >> > >> Dawid that record bound idleness *(if we would ever need to
> >> > define/expose
> >> > >> it)* should be an intermittent concept, like for example the
> >> existing in
> >> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name.
> But
> >> > >> I also don't have any good ideas.
> >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >> > >>
> >> > >> Best,
> >> > >> Piotrek
> >> > >>
> >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]> napisał(a):
> >> > >>
> >> > >>> Hi devs,
> >> > >>>
> >> > >>> While discussing "Watermark propagation with Sink API" and during
> >> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> >> > >> stream",
> >> > >>> we noticed some drawbacks on how Flink defines idle partitions
> >> > currently.
> >> > >>>
> >> > >>> To recap, idleness was always considered as a means to achieve
> >> progress
> >> > >> in
> >> > >>> window operators with idle partition in the source at the risk of
> >> > losing
> >> > >> a
> >> > >>> bit of correctness. In particular, records could be considered
> late,
> >> > >> simply
> >> > >>> because of that idleness timeout and not because they arrived out
> of
> >> > >> order.
> >> > >>> A potential reprocessing would not be causing these records to be
> >> > >>> considered late and we may end up with a different (correct)
> result.
> >> > >>>
> >> > >>> The drawbacks that we discovered are as follows:
> >> > >>> - We currently only use idleness to exclude respective upstream
> >> tasks
> >> > >> from
> >> > >>> participating in watermark generation.
> >> > >>> - However, the definition is bound to records. [1] In particular,
> >> > while a
> >> > >>> partition is idle, no records should be produced.
> >> > >>> - That brings us into quite a few edge cases, where operators emit
> >> > >> records,
> >> > >>> while they are actually idling: Think of timers, asyncIO
> operators,
> >> > >> window
> >> > >>> operators based on timeouts, etc. that trigger on an operator
> >> ingesting
> >> > >> an
> >> > >>> idle partition.
> >> > >>> - The proper solution would be to turn the operator active while
> >> > emitting
> >> > >>> and to return to being idle afterwards (but when?). However, this
> >> has
> >> > >> some
> >> > >>> unintended side-effects depending on when you switch back:
> >> > >>>  - If you toggle stream status for each record, you get a huge
> >> overhead
> >> > >> on
> >> > >>> stream status records and quite a bit of processing in downstream
> >> > >> operators
> >> > >>> (that code path is not much optimized since switching is
> considered
> >> a
> >> > >> rare
> >> > >>> thing).
> >> > >>>  - If you toggle after a certain time, you may get delays>idleness
> >> in
> >> > >> the
> >> > >>> downstream window operators.
> >> > >>>  - You could turn back when you processed all pending mails, but
> if
> >> you
> >> > >>> have a self-replicating mail that would be never. Self-enqueueing,
> >> low
> >> > >>> timer would also produce a flood similar to the first case.
> >> > >>>
> >> > >>> All in all, the situation is quite unsatisfying because idleness
> >> > implies
> >> > >> no
> >> > >>> records. However, currently there is no need to have that
> >> implication:
> >> > >>> since we only use it for watermarks, we can easily allow records
> to
> >> be
> >> > >>> emitted (in fact that was the old behavior before FLINK-18934 in
> >> many
> >> > >>> cases) and still get the intended behavior in respect to
> watermarks:
> >> > >>> - A channel that is active is providing watermarks.
> >> > >>> - An idle channel is not providing any watermarks but can deliver
> >> > >> records.
> >> > >>>
> >> > >>> Ultimately, that would mean that we are actually not talking
> >> > idle/active
> >> > >>> partitions anymore. We are talking more about whether a particular
> >> > >> subtask
> >> > >>> should influence downstream watermark calculation or not. Leading
> to
> >> > the
> >> > >>> following questions:
> >> > >>> 1. Do we want to change the definition as outlined?
> >> > >>> 2. Do you see any problem with emitting records on subtask without
> >> > >> explicit
> >> > >>> watermarks?
> >> > >>> 3. If we want to go this way, we may need to refine the
> >> > >> names/definitions.
> >> > >>> Any ideas?
> >> > >>>
> >> > >>> I think idle partition should translate into something like
> >> > >>> automatic/implicit/passive watermarks; active partition into
> >> > >>> explicit/active watermarks. Then StreamStatus is more about
> >> > WatermarkMode
> >> > >>> (not really happy with this one).
> >> > >>>
> >> > >>> Best,
> >> > >>>
> >> > >>> Arvid
> >> > >>>
> >> > >>> [1]
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >> > >>>
> >> > >>
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Eron Wright-3
Regarding records vs watermarks, I feel it is wrong to include records in
the considerations, because the clearest definition of idleness (IMO) is
'active participation in advancing the event-time clock', and records don't
directly affect the clock.  Of course, records indirectly influence the
clock by stimulating a generator.

Let's focus on the problem that Arvid mentioned about the need to briefly
toggle idleness (as implemented by the AnnouncedStatus class).  Seems to me
that the idleness of an operator's inputs need not strictly determine
whether its output is idle.  The operator should be able to react to status
changes on a given input (implemented in FLINK-18934), and this MAY cause a
change to the output status at the operator's discretion.  The default
behavior would be passthrough.  Meanwhile, when a given operator emits a
watermark, it is re-asserting itself as a participant in advancing the
downstream event time clock, and its output channel should transition to
active and remain active.  An operator should also be able to mark its
output channel(s) as idle, to complete the framework.

In concept, a watermark generator somewhere in the pipeline could 'take
control' of the event time clock when its input channel transitions to
idle.  The upstream source is relinquishing control of the clock in that
situation.

BTW, I recommend looking at the PR of FLINK-18934 because it lays bare the
whole pipeline.  Nice work there Dawid!  To better reflect the decoupling
of input from output idleness, "AbstractStreamOperator::emitStreamStatus"
should be named "processStreamStatus" and call an overridable method to
emit the status change whenever the combined idleness flips.  This would
facilitate an idleness-aware watermark generator and an idleness-aware sink.


On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann <[hidden email]> wrote:

> Thanks for providing these details Gordon. I have to admit that I do not
> fully follow the reasoning why periodic watermark generators forced us to
> define idleness for records. Is it because the idleness was generated based
> on the non-availability of more data in the sources and not in the
> watermark generators which are executed after the records have been read
> from the external system? So was the problem where the stream status was
> decided in the end?
>
> If there is a periodic watermark generator somewhere in the pipeline that
> periodically generates watermarks, then we don't have to mark its output
> channels as watermark idle because watermarks are being sent. Hence, given
> that the watermark generation logic makes sense, the overall job should be
> able to make progress. If the watermark generator is informed about its
> input channel status, it could even decide whether to propagate the
> watermark idleness and stop generating watermarks or not. Of course, this
> leaves room for people shooting themselves into their feet.
>
> Cheers,
> Till
>
> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <[hidden email]>
> wrote:
>
> > Forgot to provide the link to the [1] reference:
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-5017
> >
> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <
> [hidden email]>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Sorry for chiming in late here.
> > >
> > > Regarding the topic of changing the definition of StreamStatus and
> > > changing the name as well:
> > > After digging into some of the roots of this implementation [1],
> > initially
> > > the StreamStatus was actually defined to mark "watermark idleness", and
> > not
> > > "record idleness" (in fact, the alternative name "WatermarkStatus" was
> > > considered at the time).
> > >
> > > The concern at the time causing us to alter the definition to be
> "record
> > > idleness" in the final implementation was due to the existence of
> > periodic
> > > timestamp / watermark generators within the pipeline. Those would
> > continue
> > > to generate non-increasing watermarks in the absence of any input
> records
> > > from upstream. In this scenario, downstream operators would not be able
> > to
> > > consider that channel as idle and therefore watermark progress is
> locked.
> > > We could consider a timeout-based approach on those specific operators
> to
> > > toggle watermark idleness if the values remain constant for a period of
> > > time, but then again, this is very ill-defined and most likely wrong.
> > >
> > > I have not followed the newest changes to the watermark generator
> > > operators and am not sure if this issue is still relevant.
> > > Otherwise, I don't see other problems with changing the definition
> here.
> > >
> > > Thanks,
> > > Gordon
> > >
> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <[hidden email]> wrote:
> > >
> > >> Hi Eron,
> > >>
> > >> again to recap from the other thread:
> > >> - You are right that idleness is correct with static assignment and
> > fully
> > >> active partitions. In this case, the source defines idleness. (case A)
> > >> - For the more pressing use cases of idle, assigned partitions, the
> user
> > >> defines an idleness threshold, and it becomes potentially incorrect,
> > when
> > >> the partition becomes active again. (case B)
> > >> - Same holds for dynamic assignment of splits. If a source without a
> > split
> > >> gets a split assigned dynamically, there is a realistic chance that
> the
> > >> watermark advanced past the first record of the newly assigned split.
> > >> (case
> > >> C)
> > >> You can certainly insist that only the first case is valid (as it's
> > >> correct) but we know that users use it in other ways and that was also
> > the
> > >> intent of the devs.
> > >>
> > >> Now the question could be if it makes sense to distinguish these
> cases.
> > >> Would you treat the idleness information differently (especially in
> the
> > >> sink/source that motivated FLIP-167) if you knew that the idleness is
> > >> guaranteed correct?
> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> > >> (case B).
> > >>
> > >> However, that would still leave case C, which probably would need to
> be
> > >> solved completely differently. I could imagine that a source with
> > dynamic
> > >> assignments should never have IDLE subtasks and rather manage the
> > idleness
> > >> itself. For example, it could emit a watermark per second/minute that
> is
> > >> directly fetched from the source system. I'm just not sure if the
> > current
> > >> WatermarkAssigner interface suffices in that regard...
> > >>
> > >>
> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
> [hidden email]
> > >
> > >> wrote:
> > >>
> > >> > Hi Eron,
> > >> >
> > >> > Can you elaborate a bit more what do you mean? I don’t understand
> what
> > >> do
> > >> > you mean by more general solution.
> > >> >
> > >> > As of now, stream is marked idle by a source/watermark generator,
> > which
> > >> > has an effect of temporarily ignoring this stream/partition from
> > >> > calculating min watermark in the downstream tasks. However stream is
> > >> > switching back to active when any record is emitted. This is what’s
> > >> causing
> > >> > problems described by Arvid.
> > >> >
> > >> > The core of our proposal is very simple. Keep everything as it is
> > except
> > >> > stating that stream will be changed back to active only once a
> > >> watermark is
> > >> > emitted again - not record. In other words disconnecting idleness
> from
> > >> > presence of records and connecting it only to presence or lack of
> > >> > watermarks and allowing to emit records while “stream status” is
> > “idle”
> > >> >
> > >> > Piotrek
> > >> >
> > >> >
> > >> > > Wiadomość napisana przez Eron Wright <[hidden email]
> > >> .invalid>
> > >> > w dniu 09.06.2021, o godz. 06:01:
> > >> > >
> > >> > > It seems to me that idleness was introduced to deal with a very
> > >> specific
> > >> > > issue.  In the pipeline, watermarks are aggregated not on a
> > per-split
> > >> > basis
> > >> > > but on a per-subtask basis.  This works well when each subtask has
> > >> > exactly
> > >> > > one split.  When a sub-task has multiple splits, various
> > complications
> > >> > > occur involving the commingling of watermarks.  And when a
> sub-task
> > >> has
> > >> > no
> > >> > > splits, the pipeline stalls altogether.  To deal with the latter
> > >> problem,
> > >> > > idleness was introduced.  The sub-task simply declares itself to
> be
> > >> idle
> > >> > to
> > >> > > be taken out of consideration for purposes of watermark
> aggregation.
> > >> > >
> > >> > > If we're looking for a more general solution, I would suggest we
> > >> discuss
> > >> > > how to track watermarks on a per-split basis.  Or, as Till
> mentioned
> > >> > > recently, an alternate solution may be to dynamically adjust the
> > >> > > parallelism of the task.
> > >> > >
> > >> > > I don't agree with the notion that idleness involves a correctness
> > >> > > tradeoff.  The facility I described above has no impact on
> > >> correctness.
> > >> > > Meanwhile, various watermark strategies rely on heuristics
> involving
> > >> the
> > >> > > processing-time domain, and the term idleness seems to have found
> > >> > purchase
> > >> > > there too.  The connection among the concepts seems tenuous.
> > >> > >
> > >> > > -E
> > >> > >
> > >> > >
> > >> > >
> > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
> > [hidden email]>
> > >> > wrote:
> > >> > >>
> > >> > >> Hi Arvid,
> > >> > >>
> > >> > >> Thanks for writing down this summary and proposal. I think this
> was
> > >> the
> > >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> > >> arguing
> > >> > >> that idleness is intermittent, strictly a task local concept and
> as
> > >> such
> > >> > >> shouldn't be exposed in for example sinks. While me and Eron
> > thought
> > >> > that
> > >> > >> it's a concept strictly connected to watermarks.
> > >> > >>
> > >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> > >> > "providing
> > >> > >> watermark" and "not providing watermark". With respect to that I
> > >> agree
> > >> > with
> > >> > >> Dawid that record bound idleness *(if we would ever need to
> > >> > define/expose
> > >> > >> it)* should be an intermittent concept, like for example the
> > >> existing in
> > >> > >> the Task/runtime input availability
> (StreamTaskInput#isAvailable).
> > >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name.
> > But
> > >> > >> I also don't have any good ideas.
> > >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> > >> > >>
> > >> > >> Best,
> > >> > >> Piotrek
> > >> > >>
> > >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]>
> napisał(a):
> > >> > >>
> > >> > >>> Hi devs,
> > >> > >>>
> > >> > >>> While discussing "Watermark propagation with Sink API" and
> during
> > >> > >>> "[FLINK-18934] Idle stream does not advance watermark in
> connected
> > >> > >> stream",
> > >> > >>> we noticed some drawbacks on how Flink defines idle partitions
> > >> > currently.
> > >> > >>>
> > >> > >>> To recap, idleness was always considered as a means to achieve
> > >> progress
> > >> > >> in
> > >> > >>> window operators with idle partition in the source at the risk
> of
> > >> > losing
> > >> > >> a
> > >> > >>> bit of correctness. In particular, records could be considered
> > late,
> > >> > >> simply
> > >> > >>> because of that idleness timeout and not because they arrived
> out
> > of
> > >> > >> order.
> > >> > >>> A potential reprocessing would not be causing these records to
> be
> > >> > >>> considered late and we may end up with a different (correct)
> > result.
> > >> > >>>
> > >> > >>> The drawbacks that we discovered are as follows:
> > >> > >>> - We currently only use idleness to exclude respective upstream
> > >> tasks
> > >> > >> from
> > >> > >>> participating in watermark generation.
> > >> > >>> - However, the definition is bound to records. [1] In
> particular,
> > >> > while a
> > >> > >>> partition is idle, no records should be produced.
> > >> > >>> - That brings us into quite a few edge cases, where operators
> emit
> > >> > >> records,
> > >> > >>> while they are actually idling: Think of timers, asyncIO
> > operators,
> > >> > >> window
> > >> > >>> operators based on timeouts, etc. that trigger on an operator
> > >> ingesting
> > >> > >> an
> > >> > >>> idle partition.
> > >> > >>> - The proper solution would be to turn the operator active while
> > >> > emitting
> > >> > >>> and to return to being idle afterwards (but when?). However,
> this
> > >> has
> > >> > >> some
> > >> > >>> unintended side-effects depending on when you switch back:
> > >> > >>>  - If you toggle stream status for each record, you get a huge
> > >> overhead
> > >> > >> on
> > >> > >>> stream status records and quite a bit of processing in
> downstream
> > >> > >> operators
> > >> > >>> (that code path is not much optimized since switching is
> > considered
> > >> a
> > >> > >> rare
> > >> > >>> thing).
> > >> > >>>  - If you toggle after a certain time, you may get
> delays>idleness
> > >> in
> > >> > >> the
> > >> > >>> downstream window operators.
> > >> > >>>  - You could turn back when you processed all pending mails, but
> > if
> > >> you
> > >> > >>> have a self-replicating mail that would be never.
> Self-enqueueing,
> > >> low
> > >> > >>> timer would also produce a flood similar to the first case.
> > >> > >>>
> > >> > >>> All in all, the situation is quite unsatisfying because idleness
> > >> > implies
> > >> > >> no
> > >> > >>> records. However, currently there is no need to have that
> > >> implication:
> > >> > >>> since we only use it for watermarks, we can easily allow records
> > to
> > >> be
> > >> > >>> emitted (in fact that was the old behavior before FLINK-18934 in
> > >> many
> > >> > >>> cases) and still get the intended behavior in respect to
> > watermarks:
> > >> > >>> - A channel that is active is providing watermarks.
> > >> > >>> - An idle channel is not providing any watermarks but can
> deliver
> > >> > >> records.
> > >> > >>>
> > >> > >>> Ultimately, that would mean that we are actually not talking
> > >> > idle/active
> > >> > >>> partitions anymore. We are talking more about whether a
> particular
> > >> > >> subtask
> > >> > >>> should influence downstream watermark calculation or not.
> Leading
> > to
> > >> > the
> > >> > >>> following questions:
> > >> > >>> 1. Do we want to change the definition as outlined?
> > >> > >>> 2. Do you see any problem with emitting records on subtask
> without
> > >> > >> explicit
> > >> > >>> watermarks?
> > >> > >>> 3. If we want to go this way, we may need to refine the
> > >> > >> names/definitions.
> > >> > >>> Any ideas?
> > >> > >>>
> > >> > >>> I think idle partition should translate into something like
> > >> > >>> automatic/implicit/passive watermarks; active partition into
> > >> > >>> explicit/active watermarks. Then StreamStatus is more about
> > >> > WatermarkMode
> > >> > >>> (not really happy with this one).
> > >> > >>>
> > >> > >>> Best,
> > >> > >>>
> > >> > >>> Arvid
> > >> > >>>
> > >> > >>> [1]
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Definition of idle partitions

Eron Wright-3
I quickly updated the draft PR that would propagate idleness information to
the Sink function, based on the recent improvement provided by
FLINK-18934.  For illustration purposes.
https://github.com/streamnative/flink/pull/2

On Thu, Jun 10, 2021 at 11:34 AM Eron Wright <[hidden email]>
wrote:

> Regarding records vs watermarks, I feel it is wrong to include records in
> the considerations, because the clearest definition of idleness (IMO) is
> 'active participation in advancing the event-time clock', and records don't
> directly affect the clock.  Of course, records indirectly influence the
> clock by stimulating a generator.
>
> Let's focus on the problem that Arvid mentioned about the need to briefly
> toggle idleness (as implemented by the AnnouncedStatus class).  Seems to me
> that the idleness of an operator's inputs need not strictly determine
> whether its output is idle.  The operator should be able to react to status
> changes on a given input (implemented in FLINK-18934), and this MAY cause a
> change to the output status at the operator's discretion.  The default
> behavior would be passthrough.  Meanwhile, when a given operator emits a
> watermark, it is re-asserting itself as a participant in advancing the
> downstream event time clock, and its output channel should transition to
> active and remain active.  An operator should also be able to mark its
> output channel(s) as idle, to complete the framework.
>
> In concept, a watermark generator somewhere in the pipeline could 'take
> control' of the event time clock when its input channel transitions to
> idle.  The upstream source is relinquishing control of the clock in that
> situation.
>
> BTW, I recommend looking at the PR of FLINK-18934 because it lays bare the
> whole pipeline.  Nice work there Dawid!  To better reflect the decoupling
> of input from output idleness, "AbstractStreamOperator::emitStreamStatus"
> should be named "processStreamStatus" and call an overridable method to
> emit the status change whenever the combined idleness flips.  This would
> facilitate an idleness-aware watermark generator and an idleness-aware sink.
>
>
> On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann <[hidden email]>
> wrote:
>
>> Thanks for providing these details Gordon. I have to admit that I do not
>> fully follow the reasoning why periodic watermark generators forced us to
>> define idleness for records. Is it because the idleness was generated
>> based
>> on the non-availability of more data in the sources and not in the
>> watermark generators which are executed after the records have been read
>> from the external system? So was the problem where the stream status was
>> decided in the end?
>>
>> If there is a periodic watermark generator somewhere in the pipeline that
>> periodically generates watermarks, then we don't have to mark its output
>> channels as watermark idle because watermarks are being sent. Hence, given
>> that the watermark generation logic makes sense, the overall job should be
>> able to make progress. If the watermark generator is informed about its
>> input channel status, it could even decide whether to propagate the
>> watermark idleness and stop generating watermarks or not. Of course, this
>> leaves room for people shooting themselves into their feet.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <[hidden email]>
>> wrote:
>>
>> > Forgot to provide the link to the [1] reference:
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-5017
>> >
>> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <
>> [hidden email]>
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > Sorry for chiming in late here.
>> > >
>> > > Regarding the topic of changing the definition of StreamStatus and
>> > > changing the name as well:
>> > > After digging into some of the roots of this implementation [1],
>> > initially
>> > > the StreamStatus was actually defined to mark "watermark idleness",
>> and
>> > not
>> > > "record idleness" (in fact, the alternative name "WatermarkStatus" was
>> > > considered at the time).
>> > >
>> > > The concern at the time causing us to alter the definition to be
>> "record
>> > > idleness" in the final implementation was due to the existence of
>> > periodic
>> > > timestamp / watermark generators within the pipeline. Those would
>> > continue
>> > > to generate non-increasing watermarks in the absence of any input
>> records
>> > > from upstream. In this scenario, downstream operators would not be
>> able
>> > to
>> > > consider that channel as idle and therefore watermark progress is
>> locked.
>> > > We could consider a timeout-based approach on those specific
>> operators to
>> > > toggle watermark idleness if the values remain constant for a period
>> of
>> > > time, but then again, this is very ill-defined and most likely wrong.
>> > >
>> > > I have not followed the newest changes to the watermark generator
>> > > operators and am not sure if this issue is still relevant.
>> > > Otherwise, I don't see other problems with changing the definition
>> here.
>> > >
>> > > Thanks,
>> > > Gordon
>> > >
>> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <[hidden email]> wrote:
>> > >
>> > >> Hi Eron,
>> > >>
>> > >> again to recap from the other thread:
>> > >> - You are right that idleness is correct with static assignment and
>> > fully
>> > >> active partitions. In this case, the source defines idleness. (case
>> A)
>> > >> - For the more pressing use cases of idle, assigned partitions, the
>> user
>> > >> defines an idleness threshold, and it becomes potentially incorrect,
>> > when
>> > >> the partition becomes active again. (case B)
>> > >> - Same holds for dynamic assignment of splits. If a source without a
>> > split
>> > >> gets a split assigned dynamically, there is a realistic chance that
>> the
>> > >> watermark advanced past the first record of the newly assigned split.
>> > >> (case
>> > >> C)
>> > >> You can certainly insist that only the first case is valid (as it's
>> > >> correct) but we know that users use it in other ways and that was
>> also
>> > the
>> > >> intent of the devs.
>> > >>
>> > >> Now the question could be if it makes sense to distinguish these
>> cases.
>> > >> Would you treat the idleness information differently (especially in
>> the
>> > >> sink/source that motivated FLIP-167) if you knew that the idleness is
>> > >> guaranteed correct?
>> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A),
>> TIMEOUT
>> > >> (case B).
>> > >>
>> > >> However, that would still leave case C, which probably would need to
>> be
>> > >> solved completely differently. I could imagine that a source with
>> > dynamic
>> > >> assignments should never have IDLE subtasks and rather manage the
>> > idleness
>> > >> itself. For example, it could emit a watermark per second/minute
>> that is
>> > >> directly fetched from the source system. I'm just not sure if the
>> > current
>> > >> WatermarkAssigner interface suffices in that regard...
>> > >>
>> > >>
>> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
>> [hidden email]
>> > >
>> > >> wrote:
>> > >>
>> > >> > Hi Eron,
>> > >> >
>> > >> > Can you elaborate a bit more what do you mean? I don’t understand
>> what
>> > >> do
>> > >> > you mean by more general solution.
>> > >> >
>> > >> > As of now, stream is marked idle by a source/watermark generator,
>> > which
>> > >> > has an effect of temporarily ignoring this stream/partition from
>> > >> > calculating min watermark in the downstream tasks. However stream
>> is
>> > >> > switching back to active when any record is emitted. This is what’s
>> > >> causing
>> > >> > problems described by Arvid.
>> > >> >
>> > >> > The core of our proposal is very simple. Keep everything as it is
>> > except
>> > >> > stating that stream will be changed back to active only once a
>> > >> watermark is
>> > >> > emitted again - not record. In other words disconnecting idleness
>> from
>> > >> > presence of records and connecting it only to presence or lack of
>> > >> > watermarks and allowing to emit records while “stream status” is
>> > “idle”
>> > >> >
>> > >> > Piotrek
>> > >> >
>> > >> >
>> > >> > > Wiadomość napisana przez Eron Wright <[hidden email]
>> > >> .invalid>
>> > >> > w dniu 09.06.2021, o godz. 06:01:
>> > >> > >
>> > >> > > It seems to me that idleness was introduced to deal with a very
>> > >> specific
>> > >> > > issue.  In the pipeline, watermarks are aggregated not on a
>> > per-split
>> > >> > basis
>> > >> > > but on a per-subtask basis.  This works well when each subtask
>> has
>> > >> > exactly
>> > >> > > one split.  When a sub-task has multiple splits, various
>> > complications
>> > >> > > occur involving the commingling of watermarks.  And when a
>> sub-task
>> > >> has
>> > >> > no
>> > >> > > splits, the pipeline stalls altogether.  To deal with the latter
>> > >> problem,
>> > >> > > idleness was introduced.  The sub-task simply declares itself to
>> be
>> > >> idle
>> > >> > to
>> > >> > > be taken out of consideration for purposes of watermark
>> aggregation.
>> > >> > >
>> > >> > > If we're looking for a more general solution, I would suggest we
>> > >> discuss
>> > >> > > how to track watermarks on a per-split basis.  Or, as Till
>> mentioned
>> > >> > > recently, an alternate solution may be to dynamically adjust the
>> > >> > > parallelism of the task.
>> > >> > >
>> > >> > > I don't agree with the notion that idleness involves a
>> correctness
>> > >> > > tradeoff.  The facility I described above has no impact on
>> > >> correctness.
>> > >> > > Meanwhile, various watermark strategies rely on heuristics
>> involving
>> > >> the
>> > >> > > processing-time domain, and the term idleness seems to have found
>> > >> > purchase
>> > >> > > there too.  The connection among the concepts seems tenuous.
>> > >> > >
>> > >> > > -E
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
>> > [hidden email]>
>> > >> > wrote:
>> > >> > >>
>> > >> > >> Hi Arvid,
>> > >> > >>
>> > >> > >> Thanks for writing down this summary and proposal. I think this
>> was
>> > >> the
>> > >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
>> > >> arguing
>> > >> > >> that idleness is intermittent, strictly a task local concept
>> and as
>> > >> such
>> > >> > >> shouldn't be exposed in for example sinks. While me and Eron
>> > thought
>> > >> > that
>> > >> > >> it's a concept strictly connected to watermarks.
>> > >> > >>
>> > >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
>> > >> > "providing
>> > >> > >> watermark" and "not providing watermark". With respect to that I
>> > >> agree
>> > >> > with
>> > >> > >> Dawid that record bound idleness *(if we would ever need to
>> > >> > define/expose
>> > >> > >> it)* should be an intermittent concept, like for example the
>> > >> existing in
>> > >> > >> the Task/runtime input availability
>> (StreamTaskInput#isAvailable).
>> > >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good
>> name.
>> > But
>> > >> > >> I also don't have any good ideas.
>> > >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>> > >> > >>
>> > >> > >> Best,
>> > >> > >> Piotrek
>> > >> > >>
>> > >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <[hidden email]>
>> napisał(a):
>> > >> > >>
>> > >> > >>> Hi devs,
>> > >> > >>>
>> > >> > >>> While discussing "Watermark propagation with Sink API" and
>> during
>> > >> > >>> "[FLINK-18934] Idle stream does not advance watermark in
>> connected
>> > >> > >> stream",
>> > >> > >>> we noticed some drawbacks on how Flink defines idle partitions
>> > >> > currently.
>> > >> > >>>
>> > >> > >>> To recap, idleness was always considered as a means to achieve
>> > >> progress
>> > >> > >> in
>> > >> > >>> window operators with idle partition in the source at the risk
>> of
>> > >> > losing
>> > >> > >> a
>> > >> > >>> bit of correctness. In particular, records could be considered
>> > late,
>> > >> > >> simply
>> > >> > >>> because of that idleness timeout and not because they arrived
>> out
>> > of
>> > >> > >> order.
>> > >> > >>> A potential reprocessing would not be causing these records to
>> be
>> > >> > >>> considered late and we may end up with a different (correct)
>> > result.
>> > >> > >>>
>> > >> > >>> The drawbacks that we discovered are as follows:
>> > >> > >>> - We currently only use idleness to exclude respective upstream
>> > >> tasks
>> > >> > >> from
>> > >> > >>> participating in watermark generation.
>> > >> > >>> - However, the definition is bound to records. [1] In
>> particular,
>> > >> > while a
>> > >> > >>> partition is idle, no records should be produced.
>> > >> > >>> - That brings us into quite a few edge cases, where operators
>> emit
>> > >> > >> records,
>> > >> > >>> while they are actually idling: Think of timers, asyncIO
>> > operators,
>> > >> > >> window
>> > >> > >>> operators based on timeouts, etc. that trigger on an operator
>> > >> ingesting
>> > >> > >> an
>> > >> > >>> idle partition.
>> > >> > >>> - The proper solution would be to turn the operator active
>> while
>> > >> > emitting
>> > >> > >>> and to return to being idle afterwards (but when?). However,
>> this
>> > >> has
>> > >> > >> some
>> > >> > >>> unintended side-effects depending on when you switch back:
>> > >> > >>>  - If you toggle stream status for each record, you get a huge
>> > >> overhead
>> > >> > >> on
>> > >> > >>> stream status records and quite a bit of processing in
>> downstream
>> > >> > >> operators
>> > >> > >>> (that code path is not much optimized since switching is
>> > considered
>> > >> a
>> > >> > >> rare
>> > >> > >>> thing).
>> > >> > >>>  - If you toggle after a certain time, you may get
>> delays>idleness
>> > >> in
>> > >> > >> the
>> > >> > >>> downstream window operators.
>> > >> > >>>  - You could turn back when you processed all pending mails,
>> but
>> > if
>> > >> you
>> > >> > >>> have a self-replicating mail that would be never.
>> Self-enqueueing,
>> > >> low
>> > >> > >>> timer would also produce a flood similar to the first case.
>> > >> > >>>
>> > >> > >>> All in all, the situation is quite unsatisfying because
>> idleness
>> > >> > implies
>> > >> > >> no
>> > >> > >>> records. However, currently there is no need to have that
>> > >> implication:
>> > >> > >>> since we only use it for watermarks, we can easily allow
>> records
>> > to
>> > >> be
>> > >> > >>> emitted (in fact that was the old behavior before FLINK-18934
>> in
>> > >> many
>> > >> > >>> cases) and still get the intended behavior in respect to
>> > watermarks:
>> > >> > >>> - A channel that is active is providing watermarks.
>> > >> > >>> - An idle channel is not providing any watermarks but can
>> deliver
>> > >> > >> records.
>> > >> > >>>
>> > >> > >>> Ultimately, that would mean that we are actually not talking
>> > >> > idle/active
>> > >> > >>> partitions anymore. We are talking more about whether a
>> particular
>> > >> > >> subtask
>> > >> > >>> should influence downstream watermark calculation or not.
>> Leading
>> > to
>> > >> > the
>> > >> > >>> following questions:
>> > >> > >>> 1. Do we want to change the definition as outlined?
>> > >> > >>> 2. Do you see any problem with emitting records on subtask
>> without
>> > >> > >> explicit
>> > >> > >>> watermarks?
>> > >> > >>> 3. If we want to go this way, we may need to refine the
>> > >> > >> names/definitions.
>> > >> > >>> Any ideas?
>> > >> > >>>
>> > >> > >>> I think idle partition should translate into something like
>> > >> > >>> automatic/implicit/passive watermarks; active partition into
>> > >> > >>> explicit/active watermarks. Then StreamStatus is more about
>> > >> > WatermarkMode
>> > >> > >>> (not really happy with this one).
>> > >> > >>>
>> > >> > >>> Best,
>> > >> > >>>
>> > >> > >>> Arvid
>> > >> > >>>
>> > >> > >>> [1]
>> > >> > >>>
>> > >> > >>>
>> > >> > >>
>> > >> >
>> > >>
>> >
>> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>> > >> > >>>
>> > >> > >>
>> > >> >
>> > >>
>> > >
>> >
>>
>