http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-Watermark-propagation-with-Sink-API-tp50784p50800.html
Thanks Arvid and David for sharing your ideas on this subject. I'm glad to
for brokering watermarks across stream processing pipelines. I think Arvid
and inter-pipeline flow is eliminated. My goal is more limited; I want to
write the watermark that arrives at the sink to Pulsar. Simply imagine
David, I like your invariant. I see lateness as stemming from the problem
domain and from system dynamics (e.g. scheduling, batching, lag). When one
unduly sensitive to dynamics which bear on order-of-observation. My goal
needed on SinkFunction. This will allow us to easily evolve the existing
> Hi Eron,
>
> Thanks for starting this discussion. I've been thinking about this recently
> as we've run into "watermark related" issues, when chaining multiple
> pipelines together. My to cents to the discussion:
>
> How I like to think about the problem, is that there should an invariant
> that holds for any stream processing pipeline: "NON_LATE element entering
> the system, should never become LATE"
>
> Unfortunately this is exactly what happens in downstream pipelines, because
> the upstream one can:
> - break ordering (especially with higher parallelism)
> - emit elements that are ahead of output watermark
>
> There is not enough information to re-construct upstream watermark in
> latter stages (it's always just an estimate based on previous pipeline's
> output).
>
> It would be great, if we could have a general abstraction, that is reusable
> for various sources / sinks (not just Kafka / Pulsar, thought this would
> probably cover most of the use-cases) and systems.
>
> Is there any other use-case then sharing watermark between pipelines, that
> you're trying to solve?
>
> Arvid:
>
> 1. Watermarks are closely coupled to the used system (=Flink). I have a
> > hard time imagining that it's useful to use a different stream processor
> > downstream. So for now, I'm assuming that both upstream and downstream
> are
> > Flink applications. In that case, we probably define both parts of the
> > pipeline in the same Flink job similar to KafkaStream's #through.
> >
>
> I'd slightly disagree here. For example we're "materializing" change-logs
> produced by Flink pipeline into serving layer (random access db / in memory
> view / ..) and we need to know, whether responses we serve meet the
> "freshness" requirements (eg. you may want to respond differently, when
> watermark is lagging way too much behind processing time). Also not every
> stream processor in the pipeline needs to be Flink. It can as well be a
> simple element-wise transformation that reads from Kafka and writes back
> into separate topic (that's what we do for example with ML models, that
> have special hardware requirements).
>
> Best,
> D.
>
>
> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
[hidden email]> wrote:
>
> > Hi Eron,
> >
> > I think this is a useful addition for storage systems that act as
> > pass-through for Flink to reduce recovery time. It is only useful if you
> > combine it with regional fail-over as only a small part of the pipeline
> is
> > restarted.
> >
> > A couple of thoughts on the implications:
> > 1. Watermarks are closely coupled to the used system (=Flink). I have a
> > hard time imagining that it's useful to use a different stream processor
> > downstream. So for now, I'm assuming that both upstream and downstream
> are
> > Flink applications. In that case, we probably define both parts of the
> > pipeline in the same Flink job similar to KafkaStream's #through.
> > 2. The schema of the respective intermediate stream/topic would need to
> be
> > managed by Flink to encode both records and watermarks. This reduces the
> > usability quite a bit and needs to be carefully crafted.
> > 3. It's not clear to me if constructs like SchemaRegistry can be properly
> > supported (and also if they should be supported) in terms of schema
> > evolution.
> > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> > encoded.
> > 5. It's important to have some way to transport backpressure from the
> > downstream to the upstream. Or else you would have the same issue as
> > KafkaStreams where two separate pipelines can drift so far away that you
> > experience data loss if the data retention period is smaller than the
> > drift.
> > 6. It's clear that you trade a huge chunk of throughput for lower overall
> > latency in case of failure. So it's an interesting feature for use cases
> > with SLAs.
> >
> > Since we are phasing out SinkFunction, I'd prefer to only support
> > SinkWriter. Having a no-op default sounds good to me.
> >
> > We have some experimental feature for Kafka [1], which pretty much
> reflects
> > your idea. Here we have an ugly workaround to be able to process the
> > watermark by using a custom StreamSink task. We could also try to create
> a
> > FLIP that abstracts the actual system away and then we could use the
> > approach for both Pulsar and Kafka.
> >
> > [1]
> >
> >
>
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103> >
> >
> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> > <
[hidden email]> wrote:
> >
> > > I would like to propose an enhancement to the Sink API, the ability to
> > > receive upstream watermarks. I'm aware that the sink context provides
> > the
> > > current watermark for a given record. I'd like to be able to write a
> > sink
> > > function that is invoked whenever the watermark changes. Out of scope
> > > would be event-time timers (since sinks aren't keyed).
> > >
> > > For context, imagine that a stream storage system had the ability to
> > > persist watermarks in addition to ordinary elements, e.g. to serve as
> > > source watermarks in a downstream processor. Ideally one could
> compose a
> > > multi-stage, event-driven application, with watermarks flowing
> end-to-end
> > > without need for a heuristics-based watermark at each stage.
> > >
> > > The specific proposal would be a new method on `SinkFunction` and/or on
> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a
> > default
> > > implementation that does nothing.
> > >
> > > Thoughts?
> > >
> > > Thanks!
> > > Eron Wright
> > > StreamNative
> > >
> >
>