> I think the rationale for end-to-end idleness (i.e. between pipelines) is
> the same as the rationale for idleness between operators within a
> pipeline. On the 'main issue' you mentioned, we entrust the source with
> adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
> that no topics/partitions are assigned to a given sub-task); a similar
> adaption would occur in the sink. In other words, I think it reasonable
> that a sink for a watermark-aware storage system has need for the idleness
> signal.
>
> Let me explain how I would use it in Pulsar's sink. Each sub-task is a
> Pulsar producer, and is writing watermarks to a configured topic via the
> Producer API. The Pulsar broker aggregates the watermarks that are written
> by each producer into a global minimum (similar to StatusWatermarkValve).
> The broker keeps track of which producers are actively producing
> watermarks, and a producer may mark itself as idle to tell the broker not
> to wait for watermarks from it, e.g. when a producer is going offline. I
> had intended to mark the producer as idle when the sub-task is closing, but
> now I see that it would be insufficient; the producer should also be idled
> if the sub-task is idled. Otherwise, the broker would wait indefinitely
> for the idled sub-task to produce a watermark.
>
> Arvid, I think your original instincts were correct about idleness
> propagation, and I hope I've demonstrated a practical use case.
>
>
>
> On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <
[hidden email]> wrote:
>
>> When I was rethinking the idleness issue, I came to the conclusion that it
>> should be inferred at the source of the respective downstream pipeline
>> again.
>>
>> The main issue on propagating idleness is that you would force the same
>> definition across all downstream pipelines, which may not be what the user
>> intended.
>> On the other hand, I don't immediately see a technical reason why the
>> downstream source wouldn't be able to infer that.
>>
>>
>> On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <
[hidden email]
>> .invalid>
>> wrote:
>>
>>> Thanks Piotr for bringing this up. I reflected on this and I agree we
>>> should expose idleness, otherwise a multi-stage flow could stall.
>>>
>>> Regarding the latency markers, I don't see an immediate need for
>>> propagating them, because they serve to estimate latency within a
>> pipeline,
>>> not across pipelines. One would probably need to enhance the source
>>> interface also to do e2e latency. Seems we agree this aspect is out of
>>> scope.
>>>
>>> I took a look at the code to get a sense of how to accomplish this. The
>>> gist is a new `markIdle` method on the `StreamOperator` interface, that
>> is
>>> called when the stream status maintainer (the `OperatorChain`)
>> transitions
>>> to idle state. Then, a new `markIdle` method on the `SinkFunction` and
>>> `SinkWriter` that is called by the respective operators. Note that
>>> StreamStatus is an internal class.
>>>
>>> Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
>>> this new aspect:
>>>
https://github.com/streamnative/flink/pull/2/files>>>
>>> Please let me know if you'd like me to proceed to update the FLIP with
>>> these details.
>>>
>>> Thanks again,
>>> Eron
>>>
>>> On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <
[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Sorry for chipping in late in the discussion, but I would second this
>>> point
>>>> from Arvid:
>>>>
>>>>> 4. Potentially, StreamStatus and LatencyMarker would also need to be
>>>> encoded.
>>>>
>>>> It seems like this point was asked, but not followed? Or did I miss it?
>>>> Especially the StreamStatus part. For me it sounds like exposing
>>> watermarks
>>>> without letting the sink know that the stream can be idle is an
>>> incomplete
>>>> feature and can be very problematic/confusing for potential users.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> pon., 31 maj 2021 o 08:34 Arvid Heise <
[hidden email]> napisał(a):
>>>>
>>>>> Afaik everyone can start a [VOTE] thread [1]. For example, here a
>>>>> non-committer started a successful thread [2].
>>>>> If you start it, I can already cast a binding vote and we just need 2
>>>> more
>>>>> for the FLIP to be accepted.
>>>>>
>>>>> [1]
>>>>>
>>>>>
>>
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting>>>>> [2]
>>>>>
>>>>>
>>
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html>>>>> On Fri, May 28, 2021 at 8:17 PM Eron Wright <
[hidden email]
>>>>> .invalid>
>>>>> wrote:
>>>>>
>>>>>> Arvid,
>>>>>> Thanks for the feedback. I investigated the japicmp configuration,
>>>> and I
>>>>>> see that SinkWriter is marked Experimental (not Public or
>>>>> PublicEvolving).
>>>>>> I think this means that SinkWriter need not be excluded. As you
>>>>> mentioned,
>>>>>> SinkFunction is already excluded. I've updated the FLIP with an
>>>>>> explanation.
>>>>>>
>>>>>> I believe all issues are resolved. May we proceed to a vote now?
>>> And
>>>>> are
>>>>>> you able to drive the vote process?
>>>>>>
>>>>>> Thanks,
>>>>>> Eron
>>>>>>
>>>>>>
>>>>>> On Fri, May 28, 2021 at 4:40 AM Arvid Heise <
[hidden email]>
>>> wrote:
>>>>>>> Hi Eron,
>>>>>>>
>>>>>>> 1. fair point. It still feels odd to have writeWatermark in the
>>>>>>> SinkFunction (it's supposed to be functional as you mentioned),
>>> but I
>>>>>> agree
>>>>>>> that invokeWatermark is not better. So unless someone has a
>> better
>>>>> idea,
>>>>>>> I'm fine with it.
>>>>>>> 2.+3. I tried to come up with scenarios for a longer time. In
>>>> general,
>>>>> it
>>>>>>> seems as if the new SinkWriter interface encourages more
>> injection
>>>> (see
>>>>>>> processing time service in InitContext), such that the need for
>> the
>>>>>> context
>>>>>>> is really just context information of that particular record and
>> I
>>>>> don't
>>>>>>> see any use beyond timestamp and watermark. For SinkFunction, I'd
>>> not
>>>>>>> over-engineer as it's going to be deprecated soonish. So +1 to
>>> leave
>>>> it
>>>>>>> out.
>>>>>>> 4. Okay so I double-checked: from an execution perspective, it
>>> works.
>>>>>>> However, japicmp would definitely complain. I propose to add it
>> to
>>>> the
>>>>>>> compatibility section like this. We need to add an exception to
>>>>>> SinkWriter
>>>>>>> then. (SinkFunction is already on the exception list)
>>>>>>> 5.+6. Awesome, I was also sure but wanted to double check.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 26, 2021 at 7:29 PM Eron Wright <
>>>
[hidden email]
>>>>>>> .invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Arvid,
>>>>>>>>
>>>>>>>> 1. I assume that the method name `invoke` stems from
>> considering
>>>> the
>>>>>>>> SinkFunction to be a functional interface, but is otherwise
>>>>>> meaningless.
>>>>>>>> Keeping it as `writeWatermark` does keep it symmetric with
>>>>> SinkWriter.
>>>>>>> My
>>>>>>>> vote is to leave it. You decide.
>>>>>>>>
>>>>>>>> 2+3. I too considered adding a `WatermarkContext`, but it would
>>>>> merely
>>>>>>> be a
>>>>>>>> placeholder. I don't anticipate any context info in future.
>> As
>>> we
>>>>> see
>>>>>>>> with invoke, it is possible to add a context later in a
>>>>>>>> backwards-compatible way. My vote is to not introduce a
>> context.
>>>>> You
>>>>>>>> decide.
>>>>>>>>
>>>>>>>> 4. No anticipated compatibility issues.
>>>>>>>>
>>>>>>>> 5. Short answer, it works as expected. The new methods are
>>> invoked
>>>>>>>> whenever the underlying operator receives a watermark. I do
>>>> believe
>>>>>> that
>>>>>>>> batch and ingestion time applications receive watermarks. Seems
>>> the
>>>>>>>> programming model is more unified in that respect since 1.12
>>>>>> (FLIP-134).
>>>>>>>> 6. The failure behavior is the same as for elements.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Eron
>>>>>>>>
>>>>>>>> On Tue, May 25, 2021 at 12:42 PM Arvid Heise <
[hidden email]
>>>>> wrote:
>>>>>>>>> Hi Eron,
>>>>>>>>>
>>>>>>>>> I think the FLIP is crisp and mostly good to go. Some smaller
>>>>>>>>> things/questions:
>>>>>>>>>
>>>>>>>>> 1. SinkFunction#writeWatermark could be named
>>>>>>>>> SinkFunction#invokeWatermark or invokeOnWatermark to keep
>> it
>>>>>>>> symmetric.
>>>>>>>>> 2. We could add the context parameter to both. For
>>>>>>> SinkWriter#Context,
>>>>>>>>> we currently do not gain much. SinkFunction#Context also
>>>> exposes
>>>>>>>>> processing
>>>>>>>>> time, which may or may not be handy and is currently
>> mostly
>>>> used
>>>>>> for
>>>>>>>>> StreamingFileSink bucket policies. We may add that
>>> processing
>>>>> time
>>>>>>>> flag
>>>>>>>>> also to SinkWriter#Context in the future.
>>>>>>>>> 3. Alternatively, we could also add a different context
>>>>> parameter
>>>>>>> just
>>>>>>>>> to keep the API stable while allowing additional
>> information
>>>> to
>>>>> be
>>>>>>>>> passed
>>>>>>>>> in the future.
>>>>>>>>> 4. Would we run into any compatibility issue if we use
>> Flink
>>>>> 1.13
>>>>>>>> source
>>>>>>>>> in Flink 1.14 (with this FLIP) or vice versa?
>>>>>>>>> 5. What happens with sinks that use the new methods in
>>>>>> applications
>>>>>>>> that
>>>>>>>>> do not have watermarks (batch mode, processing time)? Does
>>>> this
>>>>>> also
>>>>>>>>> work
>>>>>>>>> with ingestion time sufficiently?
>>>>>>>>> 6. How do exactly once sinks deal with written watermarks
>> in
>>>>> case
>>>>>> of
>>>>>>>>> failure? I guess it's the same as normal records. (Either
>>>>> rollback
>>>>>>> of
>>>>>>>>> transaction or deduplication on resumption)
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Arvid
>>>>>>>>>
>>>>>>>>> On Tue, May 25, 2021 at 6:44 PM Eron Wright <
>>>>>
[hidden email]
>>>>>>>>> .invalid>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Does anyone have further comment on FLIP-167?
>>>>>>>>>>
>>>>>>>>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API>>>>>>>>>> Thanks,
>>>>>>>>>> Eron
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 20, 2021 at 5:02 PM Eron Wright <
>>>>>>
[hidden email]
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Filed FLIP-167: Watermarks for Sink API:
>>>>>>>>>>>
>>>>>>>>>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API>>>>>>>>>>> I'd like to call a vote next week, is that reasonable?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
>>>
[hidden email]
>>>>>>> wrote:
>>>>>>>>>>>> Hi Arvid and Eron,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the discussion and I read through Eron's pull
>>>>> request
>>>>>>>> and I
>>>>>>>>>>>> think this can benefit Pravega Flink connector as well.
>>>>>>>>>>>>
>>>>>>>>>>>> Here is some background. Pravega had the watermark
>> concept
>>>>>> through
>>>>>>>> the
>>>>>>>>>>>> event stream since two years ago, and here is a blog
>>>>>>> introduction[1]
>>>>>>>>> for
>>>>>>>>>>>> Pravega watermark.
>>>>>>>>>>>> Pravega Flink connector also had this watermark
>>> integration
>>>>> last
>>>>>>>> year
>>>>>>>>>>>> that we wanted to propagate the Flink watermark to
>> Pravega
>>>> in
>>>>>> the
>>>>>>>>>>>> SinkFunction, and at that time we just used the existing
>>>> Flink
>>>>>> API
>>>>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>> keep the last watermark in memory and check if watermark
>>>>> changes
>>>>>>> for
>>>>>>>>>> each
>>>>>>>>>>>> event[2] which is not efficient. With such new
>> interface,
>>> we
>>>>> can
>>>>>>>> also
>>>>>>>>>>>> manage the watermark propagation much more easily.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>
https://pravega.io/blog/2019/11/08/pravega-watermarking-support/>>>>>>>>>>>> [2]
>>>>>>>>>>>>
>>
https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465>>>>>>>>>>>> -----Original Message-----
>>>>>>>>>>>> From: Arvid Heise <
[hidden email]>
>>>>>>>>>>>> Sent: Wednesday, May 19, 2021 16:06
>>>>>>>>>>>> To: dev
>>>>>>>>>>>> Subject: Re: [DISCUSS] Watermark propagation with Sink
>> API
>>>>>>>>>>>>
>>>>>>>>>>>> [EXTERNAL EMAIL]
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Eron,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for pushing that topic. I can now see that the
>>>> benefit
>>>>> is
>>>>>>>> even
>>>>>>>>>>>> bigger than I initially thought. So it's worthwhile
>>> anyways
>>>> to
>>>>>>>> include
>>>>>>>>>> that.
>>>>>>>>>>>> I also briefly thought about exposing watermarks to all
>>>> UDFs,
>>>>>> but
>>>>>>>>> here I
>>>>>>>>>>>> really have an issue to see specific use cases. Could
>> you
>>>>> maybe
>>>>>>>> take a
>>>>>>>>>> few
>>>>>>>>>>>> minutes to think about it as well? I could only see
>>> someone
>>>>>>> misusing
>>>>>>>>>> Async
>>>>>>>>>>>> IO as a sink where a real sink would be more
>> appropriate.
>>> In
>>>>>>>> general,
>>>>>>>>> if
>>>>>>>>>>>> there is not a clear use case, we shouldn't add the
>>>>>> functionality
>>>>>>> as
>>>>>>>>>> it's
>>>>>>>>>>>> just increased maintenance for no value.
>>>>>>>>>>>>
>>>>>>>>>>>> If we stick to the plan, I think your PR is already in a
>>>> good
>>>>>>> shape.
>>>>>>>>> We
>>>>>>>>>>>> need to create a FLIP for it though, since it changes
>>> Public
>>>>>>>>> interfaces
>>>>>>>>>>>> [1]. I was initially not convinced that we should also
>>>> change
>>>>>> the
>>>>>>>> old
>>>>>>>>>>>> SinkFunction interface, but seeing how little the change
>>>> is, I
>>>>>>>>> wouldn't
>>>>>>>>>>>> mind at all to increase consistency. Only when we wrote
>>> the
>>>>> FLIP
>>>>>>> and
>>>>>>>>>>>> approved it (which should be minimal and fast), we
>> should
>>>>>> actually
>>>>>>>>> look
>>>>>>>>>> at
>>>>>>>>>>>> the PR ;).
>>>>>>>>>>>>
>>>>>>>>>>>> The only thing which I would improve is the name of the
>>>>>> function.
>>>>>>>>>>>> processWatermark sounds as if the sink implementer
>> really
>>>>> needs
>>>>>> to
>>>>>>>>>>>> implement it (as you would need to do it on a custom
>>>>> operator).
>>>>>> I
>>>>>>>>> would
>>>>>>>>>>>> make them symmetric to the record writing/invoking
>> method
>>>>> (e.g.
>>>>>>>>>>>> writeWatermark and invokeWatermark).
>>>>>>>>>>>>
>>>>>>>>>>>> As a follow-up PR, we should then migrate KafkaShuffle
>> to
>>>> the
>>>>>> new
>>>>>>>> API.
>>>>>>>>>>>> But that's something I can do.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$>>>>>>>>>>>> [cwiki[.]apache[.]org]
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 19, 2021 at 3:34 AM Eron Wright <
>>>>>>>>
[hidden email]
>>>>>>>>>>>> .invalid>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Update: opened an issue and a PR.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>
https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN>>> K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
>>>>>>>>>>>>> plbgRO4$ [issues[.]apache[.]org]
>>>>>>>>>>>>>
>>>>
https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950>>> __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
>>>>>>>>>>>>> $ [github[.]com]
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, May 18, 2021 at 10:03 AM Eron Wright <
>>>>>>>>>
[hidden email]
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Arvid and David for sharing your ideas on
>> this
>>>>>> subject.
>>>>>>>>> I'm
>>>>>>>>>>>>>> glad to hear that you're seeing use cases for
>>> watermark
>>>>>>>>> propagation
>>>>>>>>>>>>>> via an enhanced sink interface.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As you've guessed, my interest is in Pulsar and am
>>>>> exploring
>>>>>>>> some
>>>>>>>>>>>>>> options for brokering watermarks across stream
>>>> processing
>>>>>>>>> pipelines.
>>>>>>>>>>>>>> I think
>>>>>>>>>>>>> Arvid
>>>>>>>>>>>>>> is speaking to a high-fidelity solution where the
>>>>> difference
>>>>>>>>> between
>>>>>>>>>>>>> intra-
>>>>>>>>>>>>>> and inter-pipeline flow is eliminated. My goal is
>>> more
>>>>>>>> limited; I
>>>>>>>>>>>>>> want
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> write the watermark that arrives at the sink to
>>> Pulsar.
>>>>>>> Simply
>>>>>>>>>>>>>> imagine that Pulsar has native support for
>>> watermarking
>>>> in
>>>>>> its
>>>>>>>>>>>>>> producer/consumer API, and we'll leave the details
>> to
>>>>>> another
>>>>>>>>> forum.
>>>>>>>>>>>>>> David, I like your invariant. I see lateness as
>>>> stemming
>>>>>> from
>>>>>>>> the
>>>>>>>>>>>>> problem
>>>>>>>>>>>>>> domain and from system dynamics (e.g. scheduling,
>>>>> batching,
>>>>>>>> lag).
>>>>>>>>>>>>>> When
>>>>>>>>>>>>> one
>>>>>>>>>>>>>> depends on order-of-observation to generate
>>> watermarks,
>>>>> the
>>>>>>> app
>>>>>>>>> may
>>>>>>>>>>>>> become
>>>>>>>>>>>>>> unduly sensitive to dynamics which bear on
>>>>>>> order-of-observation.
>>>>>>>>> My
>>>>>>>>>>>>>> goal is to factor out the system dynamics from
>>> lateness
>>>>>>>>>> determination.
>>>>>>>>>>>>>> Arvid, to be most valuable (at least for my
>> purposes)
>>>> the
>>>>>>>>>>>>>> enhancement is needed on SinkFunction. This will
>>> allow
>>>> us
>>>>>> to
>>>>>>>>> easily
>>>>>>>>>>>>>> evolve the existing Pulsar connector.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Next step, I will open a PR to advance the
>>> conversation.
>>>>>>>>>>>>>> Eron
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, May 18, 2021 at 5:06 AM David Morávek
>>>>>>>>>>>>>> <
[hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Eron,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for starting this discussion. I've been
>>> thinking
>>>>>> about
>>>>>>>>> this
>>>>>>>>>>>>>>> recently as we've run into "watermark related"
>>> issues,
>>>>> when
>>>>>>>>>>>>>>> chaining multiple pipelines together. My to cents
>> to
>>>> the
>>>>>>>>>>>>>>> discussion:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How I like to think about the problem, is that
>> there
>>>>> should
>>>>>>> an
>>>>>>>>>>>>>>> invariant that holds for any stream processing
>>>> pipeline:
>>>>>>>>> "NON_LATE
>>>>>>>>>>>>>>> element
>>>>>>>>>>>>> entering
>>>>>>>>>>>>>>> the system, should never become LATE"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unfortunately this is exactly what happens in
>>>> downstream
>>>>>>>>> pipelines,
>>>>>>>>>>>>>>> because the upstream one can:
>>>>>>>>>>>>>>> - break ordering (especially with higher
>> parallelism)
>>>>>>>>>>>>>>> - emit elements that are ahead of output watermark
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There is not enough information to re-construct
>>>> upstream
>>>>>>>>> watermark
>>>>>>>>>>>>>>> in latter stages (it's always just an estimate
>> based
>>> on
>>>>>>>> previous
>>>>>>>>>>>>>>> pipeline's output).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It would be great, if we could have a general
>>>>> abstraction,
>>>>>>> that
>>>>>>>>> is
>>>>>>>>>>>>>>> reusable for various sources / sinks (not just
>> Kafka
>>> /
>>>>>>> Pulsar,
>>>>>>>>>>>>>>> thought this would probably cover most of the
>>>> use-cases)
>>>>>> and
>>>>>>>>>>>>>>> systems.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is there any other use-case then sharing watermark
>>>>> between
>>>>>>>>>>>>>>> pipelines,
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> you're trying to solve?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Arvid:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Watermarks are closely coupled to the used
>> system
>>>>>>> (=Flink).
>>>>>>>> I
>>>>>>>>>>>>>>> have a
>>>>>>>>>>>>>>>> hard time imagining that it's useful to use a
>>>> different
>>>>>>>> stream
>>>>>>>>>>>>> processor
>>>>>>>>>>>>>>>> downstream. So for now, I'm assuming that both
>>>> upstream
>>>>>> and
>>>>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> Flink applications. In that case, we probably
>>> define
>>>>> both
>>>>>>>> parts
>>>>>>>>>>>>>>>> of the pipeline in the same Flink job similar to
>>>>>>>> KafkaStream's
>>>>>>>>>>>> #through.
>>>>>>>>>>>>>>> I'd slightly disagree here. For example we're
>>>>>> "materializing"
>>>>>>>>>>>>> change-logs
>>>>>>>>>>>>>>> produced by Flink pipeline into serving layer
>> (random
>>>>>> access
>>>>>>>> db /
>>>>>>>>>>>>>>> in memory view / ..) and we need to know, whether
>>>>> responses
>>>>>>> we
>>>>>>>>>>>>>>> serve meet the "freshness" requirements (eg. you
>> may
>>>> want
>>>>>> to
>>>>>>>>>>>>>>> respond differently, when watermark is lagging way
>>> too
>>>>> much
>>>>>>>>> behind
>>>>>>>>>>>>>>> processing time). Also not
>>>>>>>>>>>>> every
>>>>>>>>>>>>>>> stream processor in the pipeline needs to be Flink.
>>> It
>>>>> can
>>>>>> as
>>>>>>>>> well
>>>>>>>>>>>>>>> be a simple element-wise transformation that reads
>>> from
>>>>>> Kafka
>>>>>>>> and
>>>>>>>>>>>>>>> writes back into separate topic (that's what we do
>>> for
>>>>>>> example
>>>>>>>>> with
>>>>>>>>>>>>>>> ML models, that have special hardware
>> requirements).
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> D.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
>>>>>>>
[hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi Eron,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think this is a useful addition for storage
>>> systems
>>>>>> that
>>>>>>>> act
>>>>>>>>> as
>>>>>>>>>>>>>>>> pass-through for Flink to reduce recovery time.
>> It
>>> is
>>>>>> only
>>>>>>>>> useful
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> combine it with regional fail-over as only a
>> small
>>>> part
>>>>>> of
>>>>>>>> the
>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> restarted.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> A couple of thoughts on the implications:
>>>>>>>>>>>>>>>> 1. Watermarks are closely coupled to the used
>>> system
>>>>>>>> (=Flink).
>>>>>>>>> I
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> hard time imagining that it's useful to use a
>>>> different
>>>>>>>> stream
>>>>>>>>>>>>> processor
>>>>>>>>>>>>>>>> downstream. So for now, I'm assuming that both
>>>> upstream
>>>>>> and
>>>>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> Flink applications. In that case, we probably
>>> define
>>>>> both
>>>>>>>> parts
>>>>>>>>>>>>>>>> of the pipeline in the same Flink job similar to
>>>>>>>> KafkaStream's
>>>>>>>>>>>> #through.
>>>>>>>>>>>>>>>> 2. The schema of the respective intermediate
>>>>> stream/topic
>>>>>>>> would
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> managed by Flink to encode both records and
>>>> watermarks.
>>>>>>> This
>>>>>>>>>>>>>>>> reduces
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> usability quite a bit and needs to be carefully
>>>>> crafted.
>>>>>>>>>>>>>>>> 3. It's not clear to me if constructs like
>>>>> SchemaRegistry
>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>>>>> properly
>>>>>>>>>>>>>>>> supported (and also if they should be supported)
>> in
>>>>> terms
>>>>>>> of
>>>>>>>>>>>>>>>> schema evolution.
>>>>>>>>>>>>>>>> 4. Potentially, StreamStatus and LatencyMarker
>>> would
>>>>> also
>>>>>>>> need
>>>>>>>>> to
>>>>>>>>>>>>>>>> be encoded.
>>>>>>>>>>>>>>>> 5. It's important to have some way to transport
>>>>>>> backpressure
>>>>>>>>> from
>>>>>>>>>>>>>>>> the downstream to the upstream. Or else you would
>>>> have
>>>>>> the
>>>>>>>> same
>>>>>>>>>>>>>>>> issue as KafkaStreams where two separate
>> pipelines
>>>> can
>>>>>>> drift
>>>>>>>> so
>>>>>>>>>>>>>>>> far away that
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> experience data loss if the data retention period
>>> is
>>>>>>> smaller
>>>>>>>>> than
>>>>>>>>>>>>>>>> the drift.
>>>>>>>>>>>>>>>> 6. It's clear that you trade a huge chunk of
>>>> throughput
>>>>>> for
>>>>>>>>> lower
>>>>>>>>>>>>>>> overall
>>>>>>>>>>>>>>>> latency in case of failure. So it's an
>> interesting
>>>>>> feature
>>>>>>>> for
>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>> with SLAs.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Since we are phasing out SinkFunction, I'd prefer
>>> to
>>>>> only
>>>>>>>>> support
>>>>>>>>>>>>>>>> SinkWriter. Having a no-op default sounds good to
>>> me.
>>>>>>>>>>>>>>>> We have some experimental feature for Kafka [1],
>>>> which
>>>>>>> pretty
>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>> reflects
>>>>>>>>>>>>>>>> your idea. Here we have an ugly workaround to be
>>> able
>>>>> to
>>>>>>>>> process
>>>>>>>>>>>>>>>> the watermark by using a custom StreamSink task.
>> We
>>>>> could
>>>>>>>> also
>>>>>>>>>>>>>>>> try to
>>>>>>>>>>>>>>> create a
>>>>>>>>>>>>>>>> FLIP that abstracts the actual system away and
>> then
>>>> we
>>>>>>> could
>>>>>>>>> use
>>>>>>>>>>>>>>>> the approach for both Pulsar and Kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>
https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste>>> r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
>>> k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
>>> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
>>>>>>>>>>>>> [github[.]com]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, May 17, 2021 at 10:44 PM Eron Wright
>>>>>>>>>>>>>>>> <
[hidden email]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would like to propose an enhancement to the
>>> Sink
>>>>> API,
>>>>>>> the
>>>>>>>>>>>>>>>>> ability
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> receive upstream watermarks. I'm aware that
>> the
>>>>> sink
>>>>>>>>> context
>>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> current watermark for a given record. I'd like
>>> to
>>>> be
>>>>>>> able
>>>>>>>> to
>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> sink
>>>>>>>>>>>>>>>>> function that is invoked whenever the watermark
>>>>>> changes.
>>>>>>>> Out
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>> scope
>>>>>>>>>>>>>>>>> would be event-time timers (since sinks aren't
>>>>> keyed).
>>>>>>>>>>>>>>>>> For context, imagine that a stream storage
>> system
>>>> had
>>>>>> the
>>>>>>>>>>>>>>>>> ability to persist watermarks in addition to
>>>> ordinary
>>>>>>>>> elements,
>>>>>>>>>>>>>>>>> e.g. to serve
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> source watermarks in a downstream processor.
>>>> Ideally
>>>>>> one
>>>>>>>>> could
>>>>>>>>>>>>>>> compose a
>>>>>>>>>>>>>>>>> multi-stage, event-driven application, with
>>>>> watermarks
>>>>>>>>> flowing
>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>>> without need for a heuristics-based watermark
>> at
>>>> each
>>>>>>>> stage.
>>>>>>>>>>>>>>>>> The specific proposal would be a new method on
>>>>>>>> `SinkFunction`
>>>>>>>>>>>>>>>>> and/or
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> `SinkWriter`, called 'processWatermark' or
>>>>>>>> 'writeWatermark',
>>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>> implementation that does nothing.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> Eron Wright
>>>>>>>>>>>>>>>>> StreamNative
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> streamnative.io | Meet with me
>>>>>>>>>>>>>> <
>>
https://urldefense.com/v3/__https://calendly.com/eronwright/regular>>>>> -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
>>>>>>>>>>>>>> dMtQrD25c$ [calendly[.]com]>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> <
>>
https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK>>>>>>> I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
>>>>>>>>>>>>>> [github[.]com]>
>>>>>>>>>>>>>> <
>>
https://urldefense.com/v3/__https://www.linkedin.com/company/stream>>>>> native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
>>>>>>>>>>>>>> dMqO4UZJa$ [linkedin[.]com]>
>>>>>>>>>>>>>> <
>>>>
https://urldefense.com/v3/__https://twitter.com/streamnativeio/__>>>>>>>>>> ;!
>>>> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
>>>>>>>>>>>>>> [twitter[.]com]>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>>>>>>>>
>>>>>>>>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>>>>>>>>
>>>>>>>>>>>>> streamnative.io | Meet with me
>>>>>>>>>>>>> <
>>>>
https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1>>> -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
>>>>>>>>>>>>> rD25c$ [calendly[.]com]>
>>>>>>>>>>>>>
>>>>>>>>>>>>> <
>>>>>
https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI>>>>>>>>>> !
>>>>> 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
>>>>>>>>>>>>> [github[.]com]>
>>>>>>>>>>>>> <
>>>>
https://urldefense.com/v3/__https://www.linkedin.com/company/streamna>>> tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
>>>>>>>>>>>>> 4UZJa$ [linkedin[.]com]>
>>>>>>>>>>>>> <
>>>>
https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L>>>>>>> pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
>>>>>>>>>>>>> [twitter[.]com]>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>>>>>>
>>>>>>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>>>>>>
>>>>>>>>>>> streamnative.io | Meet with me
>>>>>>>>>>> <
https://calendly.com/eronwright/regular-1-hour>
>>>>>>>>>>>
>>>>>>>>>>> <
https://github.com/streamnative>
>>>>>>>>>>> <
https://www.linkedin.com/company/streamnative/>
>>>>>>>>>>> <
https://twitter.com/streamnativeio/>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>>>>>
>>>>>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>>>>>
>>>>>>>>>> streamnative.io | Meet with me
>>>>>>>>>> <
https://calendly.com/eronwright/regular-1-hour>
>>>>>>>>>>
>>>>>>>>>> <
https://github.com/streamnative>
>>>>>>>>>> <
https://www.linkedin.com/company/streamnative/>
>>>>>>>>>> <
https://twitter.com/streamnativeio/>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>>>
>>>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>>>
>>>>>>>> streamnative.io | Meet with me
>>>>>>>> <
https://calendly.com/eronwright/regular-1-hour>
>>>>>>>>
>>>>>>>> <
https://github.com/streamnative>
>>>>>>>> <
https://www.linkedin.com/company/streamnative/>
>>>>>>>> <
https://twitter.com/streamnativeio/>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Eron Wright Cloud Engineering Lead
>>>>>>
>>>>>> p: +1 425 922 8617 <18163542939>
>>>>>>
>>>>>> streamnative.io | Meet with me
>>>>>> <
https://calendly.com/eronwright/regular-1-hour>
>>>>>>
>>>>>> <
https://github.com/streamnative>
>>>>>> <
https://www.linkedin.com/company/streamnative/>
>>>>>> <
https://twitter.com/streamnativeio/>
>>>>>>
>>>
>>> --
>>>
>>> Eron Wright Cloud Engineering Lead
>>>
>>> p: +1 425 922 8617 <18163542939>
>>>
>>> streamnative.io | Meet with me
>>> <
https://calendly.com/eronwright/regular-1-hour>
>>>
>>> <
https://github.com/streamnative>
>>> <
https://www.linkedin.com/company/streamnative/>
>>> <
https://twitter.com/streamnativeio/>
>>>
>