http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-150-Introduce-Hybrid-Source-tp46271p51383.html
should not be immutable. I think it would be better to have a setter
checkpoint serializers. If we were to instantiate the source at switch
> change Source(Enumerator)#start to take the start time as a parameter. Can
This actually hasn't been discussed. The original proposal left the
use a timestamp. But there may be use cases where the start position
task. It can be implemented as the default of option 3.
>
>
> On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <
[hidden email]> wrote:
>
> > Thanks for the suggestions and feedback on the PR.
> >
> > A variation of hybrid source that can switch back and forth was
> > brought up before and it is something that will be eventually
> > required. It was also suggested by Stephan that in the future there
> > may be more than one implementation of hybrid source for different
> > requirements.
> >
> > I want to bring back the topic of how enumerator end state can be
> > converted into start position from the PR [1]. We started in the FLIP
> > page with "switchable" interfaces, the prototype had checkpoint
> > conversion and now the PR has a function that allows to augment the
> > source. Each of these has pros and cons but we will need to converge.
> >
> > 1. Switchable interfaces
> > * unified solution
> > * requires sources to implement a special interface to participate in
> > HybridSource, even when no dynamic conversion is needed
> >
> > 2. Checkpoint state
> > * unified solution
> > * no interface changes
> > * requires implementation change to existing enumerators to include
> > end state (like a timestamp) into their checkpoint state
> > * existing sources work as is for fixed start position
> >
> > 3. Source modification at switch time to set start position
> > * can be solved per source, least restrictive
> > * no interface changes
> > * requires enumerator to expose end state (as a getter) and source to
> > be either mutable or source to be copied and augmented with the start
> > position.
> > * existing sources work as is for fixed start position
> >
> > I think more eyes might help to finalize the approach.
> >
> > [1]
https://github.com/apache/flink/pull/15924#discussion_r649929865> >
> > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <
[hidden email]> wrote:
> > >
> > > > hybrid sounds to me more like the source would constantly switch back
> > and forth
> > >
> > > Initially, the focus of hybrid source is more like a sequenced chain.
> > >
> > > But in the future it would be cool that hybrid sources can intelligently
> > switch back and forth between historical data source (like Iceberg) and
> > live data source (like Kafka). E.g.,
> > > - if the Flink job is lagging behind Kafka retention, automatically
> > switch to Iceberg source
> > > - once job caught up, switch back to Kafka source
> > >
> > > That can simplify operational aspects of manually switching.
> > >
> > >
> > > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <
[hidden email]> wrote:
> > >>
> > >> Sorry for joining the party so late, but it's such an interesting FLIP
> > with
> > >> a huge impact that I wanted to add my 2 cents. [1]
> > >> I'm mirroring some basic question from the PR review to this thread
> > because
> > >> it's about the name:
> > >>
> > >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> > >> similar.
> > >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> > and
> > >> does not carry the concatentation concept as well (hybrid sounds to me
> > more
> > >> like the source would constantly switch back and forth).
> > >>
> > >> Could we take a few minutes to think if this is the most intuitive name
> > for
> > >> new users? I'm especially hoping that natives might give some ideas (or
> > >> declare that Hybrid is perfect).
> > >>
> > >> [1]
> >
https://github.com/apache/flink/pull/15924#pullrequestreview-677376664> > >>
> > >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <
[hidden email]> wrote:
> > >>
> > >> > > Converter function relies on the specific enumerator capabilities
> > to set
> > >> > the new start position (e.g.
> > >> > fileSourceEnumerator.getEndTimestamp() and
> > >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> >
> > >> > I guess the premise is that a converter is for a specific tuple of
> > >> > (upstream source, downstream source) . We don't have to define generic
> > >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> > >> >
> > >> > The benefit of defining EndtStateT and SwitchableEnumerator
> > interfaces is
> > >> > probably promoting uniformity across sources that support
> > hybrid/switchable
> > >> > source.
> > >> >
> > >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <
[hidden email]> wrote:
> > >> >
> > >> > > Hi Steven,
> > >> > >
> > >> > > Thank you for the thorough review of the PR and for bringing this
> > back
> > >> > > to the mailing list.
> > >> > >
> > >> > > All,
> > >> > >
> > >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > >> > > deviates from the original proposal [1]. The goal would be to update
> > >> > > the FLIP soon and bring it to a vote, as previously suggested
> > offline
> > >> > > by Nicholas.
> > >> > >
> > >> > > A few minor issues in the PR are outstanding and I'm working on test
> > >> > > coverage for the recovery behavior, which should be completed soon.
> > >> > >
> > >> > > The dynamic position transfer needs to be concluded before we can
> > move
> > >> > > forward however.
> > >> > >
> > >> > > There have been various ideas, including the special
> > >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> > or
> > >> > > an enumerator interface extension to extract the end state.
> > >> > >
> > >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> > built
> > >> > > with FLIP-27 without any change." and I think it is important to
> > honor
> > >> > > that goal given that fixed start positions do not require interface
> > >> > > changes.
> > >> > >
> > >> > > Based on the feedback the following might be a good solution for
> > >> > > runtime position transfer:
> > >> > >
> > >> > > * User supplies the optional converter function (not applicable for
> > >> > > fixed positions).
> > >> > > * Instead of relying on the enumerator checkpoint state [2], the
> > >> > > converter function will be supplied with the current and next
> > >> > > enumerator (source.createEnumerator).
> > >> > > * Converter function relies on the specific enumerator capabilities
> > to
> > >> > > set the new start position (e.g.
> > >> > > fileSourceEnumerator.getEndTimestamp() and
> > >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >> > >
> > >> > > With this approach, there is no need to augment FLIP-27 interfaces
> > and
> > >> > > custom source capabilities are easier to integrate. Removing the
> > >> > > mandate to rely on enumerator checkpoint state also avoids potential
> > >> > > upgrade/compatibility issues.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > > [1]
> > >> > >
> > >> >
> >
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation> > >> > > [2]
> > >> > >
> > >> >
> >
https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281> > >> > >
> > >> > >
> > >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <
[hidden email]>
> > wrote:
> > >> > > >
> > >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> > if I
> > >> > > > missed anything.
> > >> > > >
> > >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > >> > converter.
> > >> > > > * Current PR uses the enumerator checkpoint state type as the
> > input for
> > >> > > the
> > >> > > > converter
> > >> > > > * FLIP-150 defines a new EndStateT interface.
> > >> > > > It seems that the FLIP-150 approach of EndStateT is more
> > flexible, as
> > >> > > > transition EndStateT doesn't have to be included in the upstream
> > source
> > >> > > > checkpoint state.
> > >> > > >
> > >> > > > Let's look at two use cases:
> > >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> > am - 5
> > >> > > pm,
> > >> > > > then Kafka source starts with initial position of 5 pm. In this
> > case,
> > >> > > there
> > >> > > > is no need for converter or EndStateT since the starting time for
> > Kafka
> > >> > > > source is known and fixed.
> > >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> > the
> > >> > > > bootstrap of historic data takes a long time (like days or weeks)
> > and
> > >> > we
> > >> > > > don't know the exact time of cutover when a job is launched.
> > Instead,
> > >> > we
> > >> > > > are instructing the file source to stop when it gets close to live
> > >> > data.
> > >> > > In
> > >> > > > this case, hybrid source construction will specify a relative
> > time (now
> > >> > > - 1
> > >> > > > hour), the EndStateT (of file source) will be resolved to an
> > absolute
> > >> > > time
> > >> > > > for cutover. We probably don't need to include EndStateT (end
> > >> > timestamp)
> > >> > > as
> > >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > >> > > probably
> > >> > > > more desirable.
> > >> > > >
> > >> > > > We also discussed the converter for the Kafka source. Kafka source
> > >> > > supports
> > >> > > > different OffsetsInitializer impls (including
> > >> > > TimestampOffsetsInitializer).
> > >> > > > To support the dynamic cutover time (use case #2 above), we can
> > plug
> > >> > in a
> > >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> > is not
> > >> > > set
> > >> > > > during source/job construction. Rather it is a supplier model
> > where the
> > >> > > > starting timestamp value is set to the resolved absolute timestamp
> > >> > during
> > >> > > > switch.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Steven
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <
[hidden email]>
> > wrote:
> > >> > > >
> > >> > > > > Hi Nicholas,
> > >> > > > >
> > >> > > > > Thanks for taking a look at the PR!
> > >> > > > >
> > >> > > > > 1. Regarding switching mechanism:
> > >> > > > >
> > >> > > > > There has been previous discussion in this thread regarding the
> > pros
> > >> > > > > and cons of how the switching can be exposed to the user.
> > >> > > > >
> > >> > > > > With fixed start positions, no special switching interface to
> > >> > transfer
> > >> > > > > information between enumerators is required. Sources are
> > configured
> > >> > as
> > >> > > > > they would be when used standalone and just plugged into
> > >> > HybridSource.
> > >> > > > > I expect that to be a common use case. You can find an example
> > for
> > >> > > > > this in the ITCase:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> >
https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101> > >> > > > >
> > >> > > > > For dynamic start position, the checkpoint state is used to
> > transfer
> > >> > > > > information from old to new enumerator. An example for that can
> > be
> > >> > > > > found here:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> >
https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136> > >> > > > >
> > >> > > > > That may look verbose, but the code to convert from one state to
> > >> > > > > another can be factored out into a utility and the function
> > becomes a
> > >> > > > > one-liner.
> > >> > > > >
> > >> > > > > For common sources like files and Kafka we can potentially
> > (later)
> > >> > > > > implement the conversion logic as part of the respective
> > connector's
> > >> > > > > checkpoint and split classes.
> > >> > > > >
> > >> > > > > I hope that with the PR up for review, we can soon reach a
> > conclusion
> > >> > > > > on how we want to expose this to the user.
> > >> > > > >
> > >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> > using
> > >> > for
> > >> > > > > e2e testing. It exercises both ways of setting the start
> > position.
> > >> > > > >
> > >> > > > >
https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a> > >> > > > >
> > >> > > > >
> > >> > > > > 2. Regarding the events used to implement the actual switch
> > between
> > >> > > > > enumerator and readers: I updated the PR with javadoc to
> > clarify the
> > >> > > > > intent. Please let me know if that helps or let's continue to
> > discuss
> > >> > > > > those details on the PR?
> > >> > > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Thomas
> > >> > > > >
> > >> > > > >
> > >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> >
[hidden email]>
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > Hi Thomas,
> > >> > > > > >
> > >> > > > > > Sorry for later reply for your POC. I have reviewed the
> > based
> > >> > > abstract
> > >> > > > > > implementation of your pull request:
> > >> > > > > >
https://github.com/apache/flink/pull/15924. IMO, for the
> > switching
> > >> > > > > > mechanism, this level of abstraction is not concise enough,
> > which
> > >> > > doesn't
> > >> > > > > > make connector contribution easier. In theory, it is
> > necessary to
> > >> > > > > introduce
> > >> > > > > > a set of interfaces to support the switching mechanism. The
> > >> > > > > SwitchableSource
> > >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> > connector
> > >> > > > > > expansibility.
> > >> > > > > > In other words, the whole switching process of above
> > mentioned
> > >> > PR
> > >> > > is
> > >> > > > > > different from that mentioned in FLIP-150. In the above
> > >> > > implementation,
> > >> > > > > the
> > >> > > > > > source reading switching is executed after receving the
> > >> > > > > SwitchSourceEvent,
> > >> > > > > > which could be before the sending SourceReaderFinishEvent.
> > This
> > >> > > timeline
> > >> > > > > of
> > >> > > > > > source reading switching could be discussed here.
> > >> > > > > > @Stephan @Becket, if you are available, please help to
> > review
> > >> > the
> > >> > > > > > abstract implementation, and compare with the interfaces
> > mentioned
> > >> > in
> > >> > > > > > FLIP-150.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Nicholas Jiang
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Sent from:
> > >> > > > >
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/> > >> > > > >
> > >> > >
> > >> >
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <
https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <
https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng