http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-Definition-of-idle-partitions-tp51236p51267.html
active partitions. In this case, the source defines idleness. (case A)
the partition becomes active again. (case B)
- Same holds for dynamic assignment of splits. If a source without a split
watermark advanced past the first record of the newly assigned split. (case
solved completely differently. I could imagine that a source with dynamic
itself. For example, it could emit a watermark per second/minute that is
directly fetched from the source system. I'm just not sure if the current
> Hi Eron,
>
> Can you elaborate a bit more what do you mean? I don’t understand what do
> you mean by more general solution.
>
> As of now, stream is marked idle by a source/watermark generator, which
> has an effect of temporarily ignoring this stream/partition from
> calculating min watermark in the downstream tasks. However stream is
> switching back to active when any record is emitted. This is what’s causing
> problems described by Arvid.
>
> The core of our proposal is very simple. Keep everything as it is except
> stating that stream will be changed back to active only once a watermark is
> emitted again - not record. In other words disconnecting idleness from
> presence of records and connecting it only to presence or lack of
> watermarks and allowing to emit records while “stream status” is “idle”
>
> Piotrek
>
>
> > Wiadomość napisana przez Eron Wright <
[hidden email]>
> w dniu 09.06.2021, o godz. 06:01:
> >
> > It seems to me that idleness was introduced to deal with a very specific
> > issue. In the pipeline, watermarks are aggregated not on a per-split
> basis
> > but on a per-subtask basis. This works well when each subtask has
> exactly
> > one split. When a sub-task has multiple splits, various complications
> > occur involving the commingling of watermarks. And when a sub-task has
> no
> > splits, the pipeline stalls altogether. To deal with the latter problem,
> > idleness was introduced. The sub-task simply declares itself to be idle
> to
> > be taken out of consideration for purposes of watermark aggregation.
> >
> > If we're looking for a more general solution, I would suggest we discuss
> > how to track watermarks on a per-split basis. Or, as Till mentioned
> > recently, an alternate solution may be to dynamically adjust the
> > parallelism of the task.
> >
> > I don't agree with the notion that idleness involves a correctness
> > tradeoff. The facility I described above has no impact on correctness.
> > Meanwhile, various watermark strategies rely on heuristics involving the
> > processing-time domain, and the term idleness seems to have found
> purchase
> > there too. The connection among the concepts seems tenuous.
> >
> > -E
> >
> >
> >
> >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
[hidden email]>
> wrote:
> >>
> >> Hi Arvid,
> >>
> >> Thanks for writing down this summary and proposal. I think this was the
> >> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> >> that idleness is intermittent, strictly a task local concept and as such
> >> shouldn't be exposed in for example sinks. While me and Eron thought
> that
> >> it's a concept strictly connected to watermarks.
> >>
> >> 1. I'm big +1 for changing the StreamStatus definition to stream
> "providing
> >> watermark" and "not providing watermark". With respect to that I agree
> with
> >> Dawid that record bound idleness *(if we would ever need to
> define/expose
> >> it)* should be an intermittent concept, like for example the existing in
> >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> >> I also don't have any good ideas.
> >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 8 cze 2021 o 16:35 Arvid Heise <
[hidden email]> napisał(a):
> >>
> >>> Hi devs,
> >>>
> >>> While discussing "Watermark propagation with Sink API" and during
> >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> >> stream",
> >>> we noticed some drawbacks on how Flink defines idle partitions
> currently.
> >>>
> >>> To recap, idleness was always considered as a means to achieve progress
> >> in
> >>> window operators with idle partition in the source at the risk of
> losing
> >> a
> >>> bit of correctness. In particular, records could be considered late,
> >> simply
> >>> because of that idleness timeout and not because they arrived out of
> >> order.
> >>> A potential reprocessing would not be causing these records to be
> >>> considered late and we may end up with a different (correct) result.
> >>>
> >>> The drawbacks that we discovered are as follows:
> >>> - We currently only use idleness to exclude respective upstream tasks
> >> from
> >>> participating in watermark generation.
> >>> - However, the definition is bound to records. [1] In particular,
> while a
> >>> partition is idle, no records should be produced.
> >>> - That brings us into quite a few edge cases, where operators emit
> >> records,
> >>> while they are actually idling: Think of timers, asyncIO operators,
> >> window
> >>> operators based on timeouts, etc. that trigger on an operator ingesting
> >> an
> >>> idle partition.
> >>> - The proper solution would be to turn the operator active while
> emitting
> >>> and to return to being idle afterwards (but when?). However, this has
> >> some
> >>> unintended side-effects depending on when you switch back:
> >>> - If you toggle stream status for each record, you get a huge overhead
> >> on
> >>> stream status records and quite a bit of processing in downstream
> >> operators
> >>> (that code path is not much optimized since switching is considered a
> >> rare
> >>> thing).
> >>> - If you toggle after a certain time, you may get delays>idleness in
> >> the
> >>> downstream window operators.
> >>> - You could turn back when you processed all pending mails, but if you
> >>> have a self-replicating mail that would be never. Self-enqueueing, low
> >>> timer would also produce a flood similar to the first case.
> >>>
> >>> All in all, the situation is quite unsatisfying because idleness
> implies
> >> no
> >>> records. However, currently there is no need to have that implication:
> >>> since we only use it for watermarks, we can easily allow records to be
> >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> >>> cases) and still get the intended behavior in respect to watermarks:
> >>> - A channel that is active is providing watermarks.
> >>> - An idle channel is not providing any watermarks but can deliver
> >> records.
> >>>
> >>> Ultimately, that would mean that we are actually not talking
> idle/active
> >>> partitions anymore. We are talking more about whether a particular
> >> subtask
> >>> should influence downstream watermark calculation or not. Leading to
> the
> >>> following questions:
> >>> 1. Do we want to change the definition as outlined?
> >>> 2. Do you see any problem with emitting records on subtask without
> >> explicit
> >>> watermarks?
> >>> 3. If we want to go this way, we may need to refine the
> >> names/definitions.
> >>> Any ideas?
> >>>
> >>> I think idle partition should translate into something like
> >>> automatic/implicit/passive watermarks; active partition into
> >>> explicit/active watermarks. Then StreamStatus is more about
> WatermarkMode
> >>> (not really happy with this one).
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> [1]
> >>>
> >>>
> >>
>
https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86> >>>
> >>
>