http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-Definition-of-idle-partitions-tp51236p51302.html
timestamp / watermark generators within the pipeline. Those would continue
from upstream. In this scenario, downstream operators would not be able to
> Hi Eron,
>
> again to recap from the other thread:
> - You are right that idleness is correct with static assignment and fully
> active partitions. In this case, the source defines idleness. (case A)
> - For the more pressing use cases of idle, assigned partitions, the user
> defines an idleness threshold, and it becomes potentially incorrect, when
> the partition becomes active again. (case B)
> - Same holds for dynamic assignment of splits. If a source without a split
> gets a split assigned dynamically, there is a realistic chance that the
> watermark advanced past the first record of the newly assigned split. (case
> C)
> You can certainly insist that only the first case is valid (as it's
> correct) but we know that users use it in other ways and that was also the
> intent of the devs.
>
> Now the question could be if it makes sense to distinguish these cases.
> Would you treat the idleness information differently (especially in the
> sink/source that motivated FLIP-167) if you knew that the idleness is
> guaranteed correct?
> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> (case B).
>
> However, that would still leave case C, which probably would need to be
> solved completely differently. I could imagine that a source with dynamic
> assignments should never have IDLE subtasks and rather manage the idleness
> itself. For example, it could emit a watermark per second/minute that is
> directly fetched from the source system. I'm just not sure if the current
> WatermarkAssigner interface suffices in that regard...
>
>
> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
[hidden email]>
> wrote:
>
> > Hi Eron,
> >
> > Can you elaborate a bit more what do you mean? I don’t understand what do
> > you mean by more general solution.
> >
> > As of now, stream is marked idle by a source/watermark generator, which
> > has an effect of temporarily ignoring this stream/partition from
> > calculating min watermark in the downstream tasks. However stream is
> > switching back to active when any record is emitted. This is what’s
> causing
> > problems described by Arvid.
> >
> > The core of our proposal is very simple. Keep everything as it is except
> > stating that stream will be changed back to active only once a watermark
> is
> > emitted again - not record. In other words disconnecting idleness from
> > presence of records and connecting it only to presence or lack of
> > watermarks and allowing to emit records while “stream status” is “idle”
> >
> > Piotrek
> >
> >
> > > Wiadomość napisana przez Eron Wright <
[hidden email]>
> > w dniu 09.06.2021, o godz. 06:01:
> > >
> > > It seems to me that idleness was introduced to deal with a very
> specific
> > > issue. In the pipeline, watermarks are aggregated not on a per-split
> > basis
> > > but on a per-subtask basis. This works well when each subtask has
> > exactly
> > > one split. When a sub-task has multiple splits, various complications
> > > occur involving the commingling of watermarks. And when a sub-task has
> > no
> > > splits, the pipeline stalls altogether. To deal with the latter
> problem,
> > > idleness was introduced. The sub-task simply declares itself to be
> idle
> > to
> > > be taken out of consideration for purposes of watermark aggregation.
> > >
> > > If we're looking for a more general solution, I would suggest we
> discuss
> > > how to track watermarks on a per-split basis. Or, as Till mentioned
> > > recently, an alternate solution may be to dynamically adjust the
> > > parallelism of the task.
> > >
> > > I don't agree with the notion that idleness involves a correctness
> > > tradeoff. The facility I described above has no impact on correctness.
> > > Meanwhile, various watermark strategies rely on heuristics involving
> the
> > > processing-time domain, and the term idleness seems to have found
> > purchase
> > > there too. The connection among the concepts seems tenuous.
> > >
> > > -E
> > >
> > >
> > >
> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
[hidden email]>
> > wrote:
> > >>
> > >> Hi Arvid,
> > >>
> > >> Thanks for writing down this summary and proposal. I think this was
> the
> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> arguing
> > >> that idleness is intermittent, strictly a task local concept and as
> such
> > >> shouldn't be exposed in for example sinks. While me and Eron thought
> > that
> > >> it's a concept strictly connected to watermarks.
> > >>
> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> > "providing
> > >> watermark" and "not providing watermark". With respect to that I agree
> > with
> > >> Dawid that record bound idleness *(if we would ever need to
> > define/expose
> > >> it)* should be an intermittent concept, like for example the existing
> in
> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> > >> I also don't have any good ideas.
> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <
[hidden email]> napisał(a):
> > >>
> > >>> Hi devs,
> > >>>
> > >>> While discussing "Watermark propagation with Sink API" and during
> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> > >> stream",
> > >>> we noticed some drawbacks on how Flink defines idle partitions
> > currently.
> > >>>
> > >>> To recap, idleness was always considered as a means to achieve
> progress
> > >> in
> > >>> window operators with idle partition in the source at the risk of
> > losing
> > >> a
> > >>> bit of correctness. In particular, records could be considered late,
> > >> simply
> > >>> because of that idleness timeout and not because they arrived out of
> > >> order.
> > >>> A potential reprocessing would not be causing these records to be
> > >>> considered late and we may end up with a different (correct) result.
> > >>>
> > >>> The drawbacks that we discovered are as follows:
> > >>> - We currently only use idleness to exclude respective upstream tasks
> > >> from
> > >>> participating in watermark generation.
> > >>> - However, the definition is bound to records. [1] In particular,
> > while a
> > >>> partition is idle, no records should be produced.
> > >>> - That brings us into quite a few edge cases, where operators emit
> > >> records,
> > >>> while they are actually idling: Think of timers, asyncIO operators,
> > >> window
> > >>> operators based on timeouts, etc. that trigger on an operator
> ingesting
> > >> an
> > >>> idle partition.
> > >>> - The proper solution would be to turn the operator active while
> > emitting
> > >>> and to return to being idle afterwards (but when?). However, this has
> > >> some
> > >>> unintended side-effects depending on when you switch back:
> > >>> - If you toggle stream status for each record, you get a huge
> overhead
> > >> on
> > >>> stream status records and quite a bit of processing in downstream
> > >> operators
> > >>> (that code path is not much optimized since switching is considered a
> > >> rare
> > >>> thing).
> > >>> - If you toggle after a certain time, you may get delays>idleness in
> > >> the
> > >>> downstream window operators.
> > >>> - You could turn back when you processed all pending mails, but if
> you
> > >>> have a self-replicating mail that would be never. Self-enqueueing,
> low
> > >>> timer would also produce a flood similar to the first case.
> > >>>
> > >>> All in all, the situation is quite unsatisfying because idleness
> > implies
> > >> no
> > >>> records. However, currently there is no need to have that
> implication:
> > >>> since we only use it for watermarks, we can easily allow records to
> be
> > >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> > >>> cases) and still get the intended behavior in respect to watermarks:
> > >>> - A channel that is active is providing watermarks.
> > >>> - An idle channel is not providing any watermarks but can deliver
> > >> records.
> > >>>
> > >>> Ultimately, that would mean that we are actually not talking
> > idle/active
> > >>> partitions anymore. We are talking more about whether a particular
> > >> subtask
> > >>> should influence downstream watermark calculation or not. Leading to
> > the
> > >>> following questions:
> > >>> 1. Do we want to change the definition as outlined?
> > >>> 2. Do you see any problem with emitting records on subtask without
> > >> explicit
> > >>> watermarks?
> > >>> 3. If we want to go this way, we may need to refine the
> > >> names/definitions.
> > >>> Any ideas?
> > >>>
> > >>> I think idle partition should translate into something like
> > >>> automatic/implicit/passive watermarks; active partition into
> > >>> explicit/active watermarks. Then StreamStatus is more about
> > WatermarkMode
> > >>> (not really happy with this one).
> > >>>
> > >>> Best,
> > >>>
> > >>> Arvid
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
>
https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86> > >>>
> > >>
> >
>