[DISCUSS]FLIP-150: Introduce Hybrid Source

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Steven Wu
> hybrid sounds to me more like the source would constantly switch back and
forth

Initially, the focus of hybrid source is more like a sequenced chain.

But in the future it would be cool that hybrid sources can intelligently
switch back and forth between historical data source (like Iceberg) and
live data source (like Kafka). E.g.,
- if the Flink job is lagging behind Kafka retention, automatically switch
to Iceberg source
- once job caught up, switch back to Kafka source

That can simplify operational aspects of manually switching.


On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <[hidden email]> wrote:

> Sorry for joining the party so late, but it's such an interesting FLIP with
> a huge impact that I wanted to add my 2 cents. [1]
> I'm mirroring some basic question from the PR review to this thread because
> it's about the name:
>
> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> similar.
> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
> does not carry the concatentation concept as well (hybrid sounds to me more
> like the source would constantly switch back and forth).
>
> Could we take a few minutes to think if this is the most intuitive name for
> new users? I'm especially hoping that natives might give some ideas (or
> declare that Hybrid is perfect).
>
> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>
> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote:
>
> > > Converter function relies on the specific enumerator capabilities to
> set
> > the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >
> > I guess the premise is that a converter is for a specific tuple of
> > (upstream source, downstream source) . We don't have to define generic
> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >
> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> > probably promoting uniformity across sources that support
> hybrid/switchable
> > source.
> >
> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi Steven,
> > >
> > > Thank you for the thorough review of the PR and for bringing this back
> > > to the mailing list.
> > >
> > > All,
> > >
> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > deviates from the original proposal [1]. The goal would be to update
> > > the FLIP soon and bring it to a vote, as previously suggested offline
> > > by Nicholas.
> > >
> > > A few minor issues in the PR are outstanding and I'm working on test
> > > coverage for the recovery behavior, which should be completed soon.
> > >
> > > The dynamic position transfer needs to be concluded before we can move
> > > forward however.
> > >
> > > There have been various ideas, including the special
> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > > an enumerator interface extension to extract the end state.
> > >
> > > One goal in the FLIP is to "Reuse the existing Source connectors built
> > > with FLIP-27 without any change." and I think it is important to honor
> > > that goal given that fixed start positions do not require interface
> > > changes.
> > >
> > > Based on the feedback the following might be a good solution for
> > > runtime position transfer:
> > >
> > > * User supplies the optional converter function (not applicable for
> > > fixed positions).
> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > converter function will be supplied with the current and next
> > > enumerator (source.createEnumerator).
> > > * Converter function relies on the specific enumerator capabilities to
> > > set the new start position (e.g.
> > > fileSourceEnumerator.getEndTimestamp() and
> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >
> > > With this approach, there is no need to augment FLIP-27 interfaces and
> > > custom source capabilities are easier to integrate. Removing the
> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > upgrade/compatibility issues.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > [2]
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >
> > >
> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote:
> > > >
> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > > missed anything.
> > > >
> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > converter.
> > > > * Current PR uses the enumerator checkpoint state type as the input
> for
> > > the
> > > > converter
> > > > * FLIP-150 defines a new EndStateT interface.
> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > > transition EndStateT doesn't have to be included in the upstream
> source
> > > > checkpoint state.
> > > >
> > > > Let's look at two use cases:
> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am
> - 5
> > > pm,
> > > > then Kafka source starts with initial position of 5 pm. In this case,
> > > there
> > > > is no need for converter or EndStateT since the starting time for
> Kafka
> > > > source is known and fixed.
> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > > bootstrap of historic data takes a long time (like days or weeks) and
> > we
> > > > don't know the exact time of cutover when a job is launched. Instead,
> > we
> > > > are instructing the file source to stop when it gets close to live
> > data.
> > > In
> > > > this case, hybrid source construction will specify a relative time
> (now
> > > - 1
> > > > hour), the EndStateT (of file source) will be resolved to an absolute
> > > time
> > > > for cutover. We probably don't need to include EndStateT (end
> > timestamp)
> > > as
> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > > probably
> > > > more desirable.
> > > >
> > > > We also discussed the converter for the Kafka source. Kafka source
> > > supports
> > > > different OffsetsInitializer impls (including
> > > TimestampOffsetsInitializer).
> > > > To support the dynamic cutover time (use case #2 above), we can plug
> > in a
> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is
> not
> > > set
> > > > during source/job construction. Rather it is a supplier model where
> the
> > > > starting timestamp value is set to the resolved absolute timestamp
> > during
> > > > switch.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > >
> > > >
> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for taking a look at the PR!
> > > > >
> > > > > 1. Regarding switching mechanism:
> > > > >
> > > > > There has been previous discussion in this thread regarding the
> pros
> > > > > and cons of how the switching can be exposed to the user.
> > > > >
> > > > > With fixed start positions, no special switching interface to
> > transfer
> > > > > information between enumerators is required. Sources are configured
> > as
> > > > > they would be when used standalone and just plugged into
> > HybridSource.
> > > > > I expect that to be a common use case. You can find an example for
> > > > > this in the ITCase:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > > >
> > > > > For dynamic start position, the checkpoint state is used to
> transfer
> > > > > information from old to new enumerator. An example for that can be
> > > > > found here:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > > >
> > > > > That may look verbose, but the code to convert from one state to
> > > > > another can be factored out into a utility and the function
> becomes a
> > > > > one-liner.
> > > > >
> > > > > For common sources like files and Kafka we can potentially (later)
> > > > > implement the conversion logic as part of the respective
> connector's
> > > > > checkpoint and split classes.
> > > > >
> > > > > I hope that with the PR up for review, we can soon reach a
> conclusion
> > > > > on how we want to expose this to the user.
> > > > >
> > > > > Following is an example for Files -> Files -> Kafka that I'm using
> > for
> > > > > e2e testing. It exercises both ways of setting the start position.
> > > > >
> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > > >
> > > > >
> > > > > 2. Regarding the events used to implement the actual switch between
> > > > > enumerator and readers: I updated the PR with javadoc to clarify
> the
> > > > > intent. Please let me know if that helps or let's continue to
> discuss
> > > > > those details on the PR?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > >    Sorry for later reply for your POC. I have reviewed the based
> > > abstract
> > > > > > implementation of your pull request:
> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> switching
> > > > > > mechanism, this level of abstraction is not concise enough, which
> > > doesn't
> > > > > > make connector contribution easier. In theory, it is necessary to
> > > > > introduce
> > > > > > a set of interfaces to support the switching mechanism. The
> > > > > SwitchableSource
> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > > > expansibility.
> > > > > >    In other words, the whole switching process of above mentioned
> > PR
> > > is
> > > > > > different from that mentioned in FLIP-150. In the above
> > > implementation,
> > > > > the
> > > > > > source reading switching is executed after receving the
> > > > > SwitchSourceEvent,
> > > > > > which could be before the sending SourceReaderFinishEvent. This
> > > timeline
> > > > > of
> > > > > > source reading switching could be discussed here.
> > > > > >    @Stephan @Becket, if you are available, please help to review
> > the
> > > > > > abstract implementation, and compare with the interfaces
> mentioned
> > in
> > > > > > FLIP-150.
> > > > > >
> > > > > > Thanks,
> > > > > > Nicholas Jiang
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Sent from:
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Thanks for the suggestions and feedback on the PR.

A variation of hybrid source that can switch back and forth was
brought up before and it is something that will be eventually
required. It was also suggested by Stephan that in the future there
may be more than one implementation of hybrid source for different
requirements.

I want to bring back the topic of how enumerator end state can be
converted into start position from the PR [1]. We started in the FLIP
page with "switchable" interfaces, the prototype had checkpoint
conversion and now the PR has a function that allows to augment the
source. Each of these has pros and cons but we will need to converge.

1. Switchable interfaces
* unified solution
* requires sources to implement a special interface to participate in
HybridSource, even when no dynamic conversion is needed

2. Checkpoint state
* unified solution
* no interface changes
* requires implementation change to existing enumerators to include
end state (like a timestamp) into their checkpoint state
* existing sources work as is for fixed start position

3. Source modification at switch time to set start position
* can be solved per source, least restrictive
* no interface changes
* requires enumerator to expose end state (as a getter) and source to
be either mutable or source to be copied and augmented with the start
position.
* existing sources work as is for fixed start position

I think more eyes might help to finalize the approach.

[1] https://github.com/apache/flink/pull/15924#discussion_r649929865

On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <[hidden email]> wrote:

>
> > hybrid sounds to me more like the source would constantly switch back and forth
>
> Initially, the focus of hybrid source is more like a sequenced chain.
>
> But in the future it would be cool that hybrid sources can intelligently switch back and forth between historical data source (like Iceberg) and live data source (like Kafka). E.g.,
> - if the Flink job is lagging behind Kafka retention, automatically switch to Iceberg source
> - once job caught up, switch back to Kafka source
>
> That can simplify operational aspects of manually switching.
>
>
> On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <[hidden email]> wrote:
>>
>> Sorry for joining the party so late, but it's such an interesting FLIP with
>> a huge impact that I wanted to add my 2 cents. [1]
>> I'm mirroring some basic question from the PR review to this thread because
>> it's about the name:
>>
>> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
>> similar.
>> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
>> does not carry the concatentation concept as well (hybrid sounds to me more
>> like the source would constantly switch back and forth).
>>
>> Could we take a few minutes to think if this is the most intuitive name for
>> new users? I'm especially hoping that natives might give some ideas (or
>> declare that Hybrid is perfect).
>>
>> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>>
>> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote:
>>
>> > > Converter function relies on the specific enumerator capabilities to set
>> > the new start position (e.g.
>> > fileSourceEnumerator.getEndTimestamp() and
>> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> >
>> > I guess the premise is that a converter is for a specific tuple of
>> > (upstream source, downstream source) . We don't have to define generic
>> > EndtStateT and SwitchableEnumerator interfaces. That should work.
>> >
>> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
>> > probably promoting uniformity across sources that support hybrid/switchable
>> > source.
>> >
>> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:
>> >
>> > > Hi Steven,
>> > >
>> > > Thank you for the thorough review of the PR and for bringing this back
>> > > to the mailing list.
>> > >
>> > > All,
>> > >
>> > > I updated the FLIP-150 page to highlight aspects in which the PR
>> > > deviates from the original proposal [1]. The goal would be to update
>> > > the FLIP soon and bring it to a vote, as previously suggested offline
>> > > by Nicholas.
>> > >
>> > > A few minor issues in the PR are outstanding and I'm working on test
>> > > coverage for the recovery behavior, which should be completed soon.
>> > >
>> > > The dynamic position transfer needs to be concluded before we can move
>> > > forward however.
>> > >
>> > > There have been various ideas, including the special
>> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
>> > > an enumerator interface extension to extract the end state.
>> > >
>> > > One goal in the FLIP is to "Reuse the existing Source connectors built
>> > > with FLIP-27 without any change." and I think it is important to honor
>> > > that goal given that fixed start positions do not require interface
>> > > changes.
>> > >
>> > > Based on the feedback the following might be a good solution for
>> > > runtime position transfer:
>> > >
>> > > * User supplies the optional converter function (not applicable for
>> > > fixed positions).
>> > > * Instead of relying on the enumerator checkpoint state [2], the
>> > > converter function will be supplied with the current and next
>> > > enumerator (source.createEnumerator).
>> > > * Converter function relies on the specific enumerator capabilities to
>> > > set the new start position (e.g.
>> > > fileSourceEnumerator.getEndTimestamp() and
>> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> > > * HybridSourceSplitEnumerator starts new underlying enumerator
>> > >
>> > > With this approach, there is no need to augment FLIP-27 interfaces and
>> > > custom source capabilities are easier to integrate. Removing the
>> > > mandate to rely on enumerator checkpoint state also avoids potential
>> > > upgrade/compatibility issues.
>> > >
>> > > Thoughts?
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > [1]
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
>> > > [2]
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>> > >
>> > >
>> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote:
>> > > >
>> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
>> > > > missed anything.
>> > > >
>> > > > Right now, the PR differs from the FLIP-150 doc regarding the
>> > converter.
>> > > > * Current PR uses the enumerator checkpoint state type as the input for
>> > > the
>> > > > converter
>> > > > * FLIP-150 defines a new EndStateT interface.
>> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
>> > > > transition EndStateT doesn't have to be included in the upstream source
>> > > > checkpoint state.
>> > > >
>> > > > Let's look at two use cases:
>> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
>> > > pm,
>> > > > then Kafka source starts with initial position of 5 pm. In this case,
>> > > there
>> > > > is no need for converter or EndStateT since the starting time for Kafka
>> > > > source is known and fixed.
>> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
>> > > > bootstrap of historic data takes a long time (like days or weeks) and
>> > we
>> > > > don't know the exact time of cutover when a job is launched. Instead,
>> > we
>> > > > are instructing the file source to stop when it gets close to live
>> > data.
>> > > In
>> > > > this case, hybrid source construction will specify a relative time (now
>> > > - 1
>> > > > hour), the EndStateT (of file source) will be resolved to an absolute
>> > > time
>> > > > for cutover. We probably don't need to include EndStateT (end
>> > timestamp)
>> > > as
>> > > > the file source checkpoint state. Hence, the separate EndStateT is
>> > > probably
>> > > > more desirable.
>> > > >
>> > > > We also discussed the converter for the Kafka source. Kafka source
>> > > supports
>> > > > different OffsetsInitializer impls (including
>> > > TimestampOffsetsInitializer).
>> > > > To support the dynamic cutover time (use case #2 above), we can plug
>> > in a
>> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is not
>> > > set
>> > > > during source/job construction. Rather it is a supplier model where the
>> > > > starting timestamp value is set to the resolved absolute timestamp
>> > during
>> > > > switch.
>> > > >
>> > > > Thanks,
>> > > > Steven
>> > > >
>> > > >
>> > > >
>> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:
>> > > >
>> > > > > Hi Nicholas,
>> > > > >
>> > > > > Thanks for taking a look at the PR!
>> > > > >
>> > > > > 1. Regarding switching mechanism:
>> > > > >
>> > > > > There has been previous discussion in this thread regarding the pros
>> > > > > and cons of how the switching can be exposed to the user.
>> > > > >
>> > > > > With fixed start positions, no special switching interface to
>> > transfer
>> > > > > information between enumerators is required. Sources are configured
>> > as
>> > > > > they would be when used standalone and just plugged into
>> > HybridSource.
>> > > > > I expect that to be a common use case. You can find an example for
>> > > > > this in the ITCase:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>> > > > >
>> > > > > For dynamic start position, the checkpoint state is used to transfer
>> > > > > information from old to new enumerator. An example for that can be
>> > > > > found here:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>> > > > >
>> > > > > That may look verbose, but the code to convert from one state to
>> > > > > another can be factored out into a utility and the function becomes a
>> > > > > one-liner.
>> > > > >
>> > > > > For common sources like files and Kafka we can potentially (later)
>> > > > > implement the conversion logic as part of the respective connector's
>> > > > > checkpoint and split classes.
>> > > > >
>> > > > > I hope that with the PR up for review, we can soon reach a conclusion
>> > > > > on how we want to expose this to the user.
>> > > > >
>> > > > > Following is an example for Files -> Files -> Kafka that I'm using
>> > for
>> > > > > e2e testing. It exercises both ways of setting the start position.
>> > > > >
>> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>> > > > >
>> > > > >
>> > > > > 2. Regarding the events used to implement the actual switch between
>> > > > > enumerator and readers: I updated the PR with javadoc to clarify the
>> > > > > intent. Please let me know if that helps or let's continue to discuss
>> > > > > those details on the PR?
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Thomas
>> > > > >
>> > > > >
>> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]>
>> > > > > wrote:
>> > > > > >
>> > > > > > Hi Thomas,
>> > > > > >
>> > > > > >    Sorry for later reply for your POC. I have reviewed the based
>> > > abstract
>> > > > > > implementation of your pull request:
>> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
>> > > > > > mechanism, this level of abstraction is not concise enough, which
>> > > doesn't
>> > > > > > make connector contribution easier. In theory, it is necessary to
>> > > > > introduce
>> > > > > > a set of interfaces to support the switching mechanism. The
>> > > > > SwitchableSource
>> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
>> > > > > > expansibility.
>> > > > > >    In other words, the whole switching process of above mentioned
>> > PR
>> > > is
>> > > > > > different from that mentioned in FLIP-150. In the above
>> > > implementation,
>> > > > > the
>> > > > > > source reading switching is executed after receving the
>> > > > > SwitchSourceEvent,
>> > > > > > which could be before the sending SourceReaderFinishEvent. This
>> > > timeline
>> > > > > of
>> > > > > > source reading switching could be discussed here.
>> > > > > >    @Stephan @Becket, if you are available, please help to review
>> > the
>> > > > > > abstract implementation, and compare with the interfaces mentioned
>> > in
>> > > > > > FLIP-150.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Nicholas Jiang
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Sent from:
>> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>> > > > >
>> > >
>> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Arvid Heise-3
Hi Thomas,

Thanks for bringing this up. I think this is a tough nut to crack :/.
Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor
is not aware of HybridSource. I'm also worried that we may not have a
universal interface to specify start offset/time.
I guess it also would be much easier if we would have an abstract base
source class where we could implement some basic support.

When I initially looked at the issue I was thinking that sources should
always be immutable (we have some bad experiences with mutable life-cycles
in operator implementations) and the only modifiable thing should be the
builder. That would mean that a HybridSource actually just gets a list of
source builders and creates the sources when needed with the correct
start/end offset. However, we neither have base builders (something that
I'd like to change) nor are any of the builders serializable. We could
convert sources back to builders, update the start offset, and convert to
sources again but this also seems overly complicated. So I'm assuming that
we should go with modifiable sources as also expressed in the FLIP draft.

If we could assume that we are always switching by time, we could also
change Source(Enumerator)#start to take the start time as a parameter. Can
we deduce the end time by the record timestamp? But I guess that has all
been discussed already, so sorry if I derail the discussion.

I'm also leaning towards extending the Source interface to include these
methods (with defaults) to make it harder for implementers to miss.


On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <[hidden email]> wrote:

> Thanks for the suggestions and feedback on the PR.
>
> A variation of hybrid source that can switch back and forth was
> brought up before and it is something that will be eventually
> required. It was also suggested by Stephan that in the future there
> may be more than one implementation of hybrid source for different
> requirements.
>
> I want to bring back the topic of how enumerator end state can be
> converted into start position from the PR [1]. We started in the FLIP
> page with "switchable" interfaces, the prototype had checkpoint
> conversion and now the PR has a function that allows to augment the
> source. Each of these has pros and cons but we will need to converge.
>
> 1. Switchable interfaces
> * unified solution
> * requires sources to implement a special interface to participate in
> HybridSource, even when no dynamic conversion is needed
>
> 2. Checkpoint state
> * unified solution
> * no interface changes
> * requires implementation change to existing enumerators to include
> end state (like a timestamp) into their checkpoint state
> * existing sources work as is for fixed start position
>
> 3. Source modification at switch time to set start position
> * can be solved per source, least restrictive
> * no interface changes
> * requires enumerator to expose end state (as a getter) and source to
> be either mutable or source to be copied and augmented with the start
> position.
> * existing sources work as is for fixed start position
>
> I think more eyes might help to finalize the approach.
>
> [1] https://github.com/apache/flink/pull/15924#discussion_r649929865
>
> On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <[hidden email]> wrote:
> >
> > > hybrid sounds to me more like the source would constantly switch back
> and forth
> >
> > Initially, the focus of hybrid source is more like a sequenced chain.
> >
> > But in the future it would be cool that hybrid sources can intelligently
> switch back and forth between historical data source (like Iceberg) and
> live data source (like Kafka). E.g.,
> > - if the Flink job is lagging behind Kafka retention, automatically
> switch to Iceberg source
> > - once job caught up, switch back to Kafka source
> >
> > That can simplify operational aspects of manually switching.
> >
> >
> > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <[hidden email]> wrote:
> >>
> >> Sorry for joining the party so late, but it's such an interesting FLIP
> with
> >> a huge impact that I wanted to add my 2 cents. [1]
> >> I'm mirroring some basic question from the PR review to this thread
> because
> >> it's about the name:
> >>
> >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> >> similar.
> >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> and
> >> does not carry the concatentation concept as well (hybrid sounds to me
> more
> >> like the source would constantly switch back and forth).
> >>
> >> Could we take a few minutes to think if this is the most intuitive name
> for
> >> new users? I'm especially hoping that natives might give some ideas (or
> >> declare that Hybrid is perfect).
> >>
> >> [1]
> https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
> >>
> >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote:
> >>
> >> > > Converter function relies on the specific enumerator capabilities
> to set
> >> > the new start position (e.g.
> >> > fileSourceEnumerator.getEndTimestamp() and
> >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >> >
> >> > I guess the premise is that a converter is for a specific tuple of
> >> > (upstream source, downstream source) . We don't have to define generic
> >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >> >
> >> > The benefit of defining EndtStateT and SwitchableEnumerator
> interfaces is
> >> > probably promoting uniformity across sources that support
> hybrid/switchable
> >> > source.
> >> >
> >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:
> >> >
> >> > > Hi Steven,
> >> > >
> >> > > Thank you for the thorough review of the PR and for bringing this
> back
> >> > > to the mailing list.
> >> > >
> >> > > All,
> >> > >
> >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> >> > > deviates from the original proposal [1]. The goal would be to update
> >> > > the FLIP soon and bring it to a vote, as previously suggested
> offline
> >> > > by Nicholas.
> >> > >
> >> > > A few minor issues in the PR are outstanding and I'm working on test
> >> > > coverage for the recovery behavior, which should be completed soon.
> >> > >
> >> > > The dynamic position transfer needs to be concluded before we can
> move
> >> > > forward however.
> >> > >
> >> > > There have been various ideas, including the special
> >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> or
> >> > > an enumerator interface extension to extract the end state.
> >> > >
> >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> built
> >> > > with FLIP-27 without any change." and I think it is important to
> honor
> >> > > that goal given that fixed start positions do not require interface
> >> > > changes.
> >> > >
> >> > > Based on the feedback the following might be a good solution for
> >> > > runtime position transfer:
> >> > >
> >> > > * User supplies the optional converter function (not applicable for
> >> > > fixed positions).
> >> > > * Instead of relying on the enumerator checkpoint state [2], the
> >> > > converter function will be supplied with the current and next
> >> > > enumerator (source.createEnumerator).
> >> > > * Converter function relies on the specific enumerator capabilities
> to
> >> > > set the new start position (e.g.
> >> > > fileSourceEnumerator.getEndTimestamp() and
> >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> >> > >
> >> > > With this approach, there is no need to augment FLIP-27 interfaces
> and
> >> > > custom source capabilities are easier to integrate. Removing the
> >> > > mandate to rely on enumerator checkpoint state also avoids potential
> >> > > upgrade/compatibility issues.
> >> > >
> >> > > Thoughts?
> >> > >
> >> > > Thanks,
> >> > > Thomas
> >> > >
> >> > > [1]
> >> > >
> >> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> >> > > [2]
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> >> > >
> >> > >
> >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]>
> wrote:
> >> > > >
> >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> if I
> >> > > > missed anything.
> >> > > >
> >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> >> > converter.
> >> > > > * Current PR uses the enumerator checkpoint state type as the
> input for
> >> > > the
> >> > > > converter
> >> > > > * FLIP-150 defines a new EndStateT interface.
> >> > > > It seems that the FLIP-150 approach of EndStateT is more
> flexible, as
> >> > > > transition EndStateT doesn't have to be included in the upstream
> source
> >> > > > checkpoint state.
> >> > > >
> >> > > > Let's look at two use cases:
> >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> am - 5
> >> > > pm,
> >> > > > then Kafka source starts with initial position of 5 pm. In this
> case,
> >> > > there
> >> > > > is no need for converter or EndStateT since the starting time for
> Kafka
> >> > > > source is known and fixed.
> >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> the
> >> > > > bootstrap of historic data takes a long time (like days or weeks)
> and
> >> > we
> >> > > > don't know the exact time of cutover when a job is launched.
> Instead,
> >> > we
> >> > > > are instructing the file source to stop when it gets close to live
> >> > data.
> >> > > In
> >> > > > this case, hybrid source construction will specify a relative
> time (now
> >> > > - 1
> >> > > > hour), the EndStateT (of file source) will be resolved to an
> absolute
> >> > > time
> >> > > > for cutover. We probably don't need to include EndStateT (end
> >> > timestamp)
> >> > > as
> >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> >> > > probably
> >> > > > more desirable.
> >> > > >
> >> > > > We also discussed the converter for the Kafka source. Kafka source
> >> > > supports
> >> > > > different OffsetsInitializer impls (including
> >> > > TimestampOffsetsInitializer).
> >> > > > To support the dynamic cutover time (use case #2 above), we can
> plug
> >> > in a
> >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> is not
> >> > > set
> >> > > > during source/job construction. Rather it is a supplier model
> where the
> >> > > > starting timestamp value is set to the resolved absolute timestamp
> >> > during
> >> > > > switch.
> >> > > >
> >> > > > Thanks,
> >> > > > Steven
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]>
> wrote:
> >> > > >
> >> > > > > Hi Nicholas,
> >> > > > >
> >> > > > > Thanks for taking a look at the PR!
> >> > > > >
> >> > > > > 1. Regarding switching mechanism:
> >> > > > >
> >> > > > > There has been previous discussion in this thread regarding the
> pros
> >> > > > > and cons of how the switching can be exposed to the user.
> >> > > > >
> >> > > > > With fixed start positions, no special switching interface to
> >> > transfer
> >> > > > > information between enumerators is required. Sources are
> configured
> >> > as
> >> > > > > they would be when used standalone and just plugged into
> >> > HybridSource.
> >> > > > > I expect that to be a common use case. You can find an example
> for
> >> > > > > this in the ITCase:
> >> > > > >
> >> > > > >
> >> > > > >
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> >> > > > >
> >> > > > > For dynamic start position, the checkpoint state is used to
> transfer
> >> > > > > information from old to new enumerator. An example for that can
> be
> >> > > > > found here:
> >> > > > >
> >> > > > >
> >> > > > >
> >> > >
> >> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> >> > > > >
> >> > > > > That may look verbose, but the code to convert from one state to
> >> > > > > another can be factored out into a utility and the function
> becomes a
> >> > > > > one-liner.
> >> > > > >
> >> > > > > For common sources like files and Kafka we can potentially
> (later)
> >> > > > > implement the conversion logic as part of the respective
> connector's
> >> > > > > checkpoint and split classes.
> >> > > > >
> >> > > > > I hope that with the PR up for review, we can soon reach a
> conclusion
> >> > > > > on how we want to expose this to the user.
> >> > > > >
> >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> using
> >> > for
> >> > > > > e2e testing. It exercises both ways of setting the start
> position.
> >> > > > >
> >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> >> > > > >
> >> > > > >
> >> > > > > 2. Regarding the events used to implement the actual switch
> between
> >> > > > > enumerator and readers: I updated the PR with javadoc to
> clarify the
> >> > > > > intent. Please let me know if that helps or let's continue to
> discuss
> >> > > > > those details on the PR?
> >> > > > >
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Thomas
> >> > > > >
> >> > > > >
> >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> [hidden email]>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > Hi Thomas,
> >> > > > > >
> >> > > > > >    Sorry for later reply for your POC. I have reviewed the
> based
> >> > > abstract
> >> > > > > > implementation of your pull request:
> >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> switching
> >> > > > > > mechanism, this level of abstraction is not concise enough,
> which
> >> > > doesn't
> >> > > > > > make connector contribution easier. In theory, it is
> necessary to
> >> > > > > introduce
> >> > > > > > a set of interfaces to support the switching mechanism. The
> >> > > > > SwitchableSource
> >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> connector
> >> > > > > > expansibility.
> >> > > > > >    In other words, the whole switching process of above
> mentioned
> >> > PR
> >> > > is
> >> > > > > > different from that mentioned in FLIP-150. In the above
> >> > > implementation,
> >> > > > > the
> >> > > > > > source reading switching is executed after receving the
> >> > > > > SwitchSourceEvent,
> >> > > > > > which could be before the sending SourceReaderFinishEvent.
> This
> >> > > timeline
> >> > > > > of
> >> > > > > > source reading switching could be discussed here.
> >> > > > > >    @Stephan @Becket, if you are available, please help to
> review
> >> > the
> >> > > > > > abstract implementation, and compare with the interfaces
> mentioned
> >> > in
> >> > > > > > FLIP-150.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Nicholas Jiang
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > Sent from:
> >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >> > > > >
> >> > >
> >> >
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Arvid,

Thanks for your reply -->

On Mon, Jun 14, 2021 at 2:55 PM Arvid Heise <[hidden email]> wrote:

>
> Hi Thomas,
>
> Thanks for bringing this up. I think this is a tough nut to crack :/.
> Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor
> is not aware of HybridSource. I'm also worried that we may not have a
> universal interface to specify start offset/time.
> I guess it also would be much easier if we would have an abstract base
> source class where we could implement some basic support.
>
> When I initially looked at the issue I was thinking that sources should
> always be immutable (we have some bad experiences with mutable life-cycles
> in operator implementations) and the only modifiable thing should be the
> builder. That would mean that a HybridSource actually just gets a list of
> source builders and creates the sources when needed with the correct
> start/end offset. However, we neither have base builders (something that
> I'd like to change) nor are any of the builders serializable. We could
> convert sources back to builders, update the start offset, and convert to
> sources again but this also seems overly complicated. So I'm assuming that
> we should go with modifiable sources as also expressed in the FLIP draft.

The need to set a start position at runtime indicates that sources
should not be immutable. I think it would be better to have a setter
on the source that clearly describes the mutation.

Regarding deferred construction of the sources (supplier pattern):
This is actually a very interesting idea that would also help in
situations where the exact sequence of sources isn't known upfront.
However, Source is also the factory for split and enumerator
checkpoint serializers. If we were to instantiate the source at switch
time, we would also need to distribute the serializers at switch time.
This would lead to even more complexity and move us further away from
the original goal of having a relatively simple implementation for the
basic scenarios.

> If we could assume that we are always switching by time, we could also
> change Source(Enumerator)#start to take the start time as a parameter. Can
> we deduce the end time by the record timestamp? But I guess that has all
> been discussed already, so sorry if I derail the discussion.

This actually hasn't been discussed. The original proposal left the
type of the start position open, which also makes it less attractive
(user still has to supply a converter).

For initial internal usage of the hybrid source, we are planning to
use a timestamp. But there may be use cases where the start position
could be encoded in other ways, such as based on Kafka offsets.

> I'm also leaning towards extending the Source interface to include these
> methods (with defaults) to make it harder for implementers to miss.

It would be possible to introduce an optional interface as a follow-up
task. It can be implemented as the default of option 3.

>
>
> On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise <[hidden email]> wrote:
>
> > Thanks for the suggestions and feedback on the PR.
> >
> > A variation of hybrid source that can switch back and forth was
> > brought up before and it is something that will be eventually
> > required. It was also suggested by Stephan that in the future there
> > may be more than one implementation of hybrid source for different
> > requirements.
> >
> > I want to bring back the topic of how enumerator end state can be
> > converted into start position from the PR [1]. We started in the FLIP
> > page with "switchable" interfaces, the prototype had checkpoint
> > conversion and now the PR has a function that allows to augment the
> > source. Each of these has pros and cons but we will need to converge.
> >
> > 1. Switchable interfaces
> > * unified solution
> > * requires sources to implement a special interface to participate in
> > HybridSource, even when no dynamic conversion is needed
> >
> > 2. Checkpoint state
> > * unified solution
> > * no interface changes
> > * requires implementation change to existing enumerators to include
> > end state (like a timestamp) into their checkpoint state
> > * existing sources work as is for fixed start position
> >
> > 3. Source modification at switch time to set start position
> > * can be solved per source, least restrictive
> > * no interface changes
> > * requires enumerator to expose end state (as a getter) and source to
> > be either mutable or source to be copied and augmented with the start
> > position.
> > * existing sources work as is for fixed start position
> >
> > I think more eyes might help to finalize the approach.
> >
> > [1] https://github.com/apache/flink/pull/15924#discussion_r649929865
> >
> > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <[hidden email]> wrote:
> > >
> > > > hybrid sounds to me more like the source would constantly switch back
> > and forth
> > >
> > > Initially, the focus of hybrid source is more like a sequenced chain.
> > >
> > > But in the future it would be cool that hybrid sources can intelligently
> > switch back and forth between historical data source (like Iceberg) and
> > live data source (like Kafka). E.g.,
> > > - if the Flink job is lagging behind Kafka retention, automatically
> > switch to Iceberg source
> > > - once job caught up, switch back to Kafka source
> > >
> > > That can simplify operational aspects of manually switching.
> > >
> > >
> > > On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <[hidden email]> wrote:
> > >>
> > >> Sorry for joining the party so late, but it's such an interesting FLIP
> > with
> > >> a huge impact that I wanted to add my 2 cents. [1]
> > >> I'm mirroring some basic question from the PR review to this thread
> > because
> > >> it's about the name:
> > >>
> > >> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> > >> similar.
> > >> Hybrid has the connotation of 2 for me (maybe because I'm a non-native)
> > and
> > >> does not carry the concatentation concept as well (hybrid sounds to me
> > more
> > >> like the source would constantly switch back and forth).
> > >>
> > >> Could we take a few minutes to think if this is the most intuitive name
> > for
> > >> new users? I'm especially hoping that natives might give some ideas (or
> > >> declare that Hybrid is perfect).
> > >>
> > >> [1]
> > https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
> > >>
> > >> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote:
> > >>
> > >> > > Converter function relies on the specific enumerator capabilities
> > to set
> > >> > the new start position (e.g.
> > >> > fileSourceEnumerator.getEndTimestamp() and
> > >> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> >
> > >> > I guess the premise is that a converter is for a specific tuple of
> > >> > (upstream source, downstream source) . We don't have to define generic
> > >> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> > >> >
> > >> > The benefit of defining EndtStateT and SwitchableEnumerator
> > interfaces is
> > >> > probably promoting uniformity across sources that support
> > hybrid/switchable
> > >> > source.
> > >> >
> > >> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:
> > >> >
> > >> > > Hi Steven,
> > >> > >
> > >> > > Thank you for the thorough review of the PR and for bringing this
> > back
> > >> > > to the mailing list.
> > >> > >
> > >> > > All,
> > >> > >
> > >> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > >> > > deviates from the original proposal [1]. The goal would be to update
> > >> > > the FLIP soon and bring it to a vote, as previously suggested
> > offline
> > >> > > by Nicholas.
> > >> > >
> > >> > > A few minor issues in the PR are outstanding and I'm working on test
> > >> > > coverage for the recovery behavior, which should be completed soon.
> > >> > >
> > >> > > The dynamic position transfer needs to be concluded before we can
> > move
> > >> > > forward however.
> > >> > >
> > >> > > There have been various ideas, including the special
> > >> > > "SwitchableEnumerator" interface, using enumerator checkpoint state
> > or
> > >> > > an enumerator interface extension to extract the end state.
> > >> > >
> > >> > > One goal in the FLIP is to "Reuse the existing Source connectors
> > built
> > >> > > with FLIP-27 without any change." and I think it is important to
> > honor
> > >> > > that goal given that fixed start positions do not require interface
> > >> > > changes.
> > >> > >
> > >> > > Based on the feedback the following might be a good solution for
> > >> > > runtime position transfer:
> > >> > >
> > >> > > * User supplies the optional converter function (not applicable for
> > >> > > fixed positions).
> > >> > > * Instead of relying on the enumerator checkpoint state [2], the
> > >> > > converter function will be supplied with the current and next
> > >> > > enumerator (source.createEnumerator).
> > >> > > * Converter function relies on the specific enumerator capabilities
> > to
> > >> > > set the new start position (e.g.
> > >> > > fileSourceEnumerator.getEndTimestamp() and
> > >> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > >> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >> > >
> > >> > > With this approach, there is no need to augment FLIP-27 interfaces
> > and
> > >> > > custom source capabilities are easier to integrate. Removing the
> > >> > > mandate to rely on enumerator checkpoint state also avoids potential
> > >> > > upgrade/compatibility issues.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > > [1]
> > >> > >
> > >> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > >> > > [2]
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >> > >
> > >> > >
> > >> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]>
> > wrote:
> > >> > > >
> > >> > > > discussed the PR with Thosmas offline. Thomas, please correct me
> > if I
> > >> > > > missed anything.
> > >> > > >
> > >> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > >> > converter.
> > >> > > > * Current PR uses the enumerator checkpoint state type as the
> > input for
> > >> > > the
> > >> > > > converter
> > >> > > > * FLIP-150 defines a new EndStateT interface.
> > >> > > > It seems that the FLIP-150 approach of EndStateT is more
> > flexible, as
> > >> > > > transition EndStateT doesn't have to be included in the upstream
> > source
> > >> > > > checkpoint state.
> > >> > > >
> > >> > > > Let's look at two use cases:
> > >> > > > 1) static cutover time at 5 pm. File source reads all data btw 9
> > am - 5
> > >> > > pm,
> > >> > > > then Kafka source starts with initial position of 5 pm. In this
> > case,
> > >> > > there
> > >> > > > is no need for converter or EndStateT since the starting time for
> > Kafka
> > >> > > > source is known and fixed.
> > >> > > > 2) dynamic cutover time at 1 hour before now. This is useful when
> > the
> > >> > > > bootstrap of historic data takes a long time (like days or weeks)
> > and
> > >> > we
> > >> > > > don't know the exact time of cutover when a job is launched.
> > Instead,
> > >> > we
> > >> > > > are instructing the file source to stop when it gets close to live
> > >> > data.
> > >> > > In
> > >> > > > this case, hybrid source construction will specify a relative
> > time (now
> > >> > > - 1
> > >> > > > hour), the EndStateT (of file source) will be resolved to an
> > absolute
> > >> > > time
> > >> > > > for cutover. We probably don't need to include EndStateT (end
> > >> > timestamp)
> > >> > > as
> > >> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > >> > > probably
> > >> > > > more desirable.
> > >> > > >
> > >> > > > We also discussed the converter for the Kafka source. Kafka source
> > >> > > supports
> > >> > > > different OffsetsInitializer impls (including
> > >> > > TimestampOffsetsInitializer).
> > >> > > > To support the dynamic cutover time (use case #2 above), we can
> > plug
> > >> > in a
> > >> > > > SupplierTimestampOffsetInitializer, where the starting timestamp
> > is not
> > >> > > set
> > >> > > > during source/job construction. Rather it is a supplier model
> > where the
> > >> > > > starting timestamp value is set to the resolved absolute timestamp
> > >> > during
> > >> > > > switch.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Steven
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]>
> > wrote:
> > >> > > >
> > >> > > > > Hi Nicholas,
> > >> > > > >
> > >> > > > > Thanks for taking a look at the PR!
> > >> > > > >
> > >> > > > > 1. Regarding switching mechanism:
> > >> > > > >
> > >> > > > > There has been previous discussion in this thread regarding the
> > pros
> > >> > > > > and cons of how the switching can be exposed to the user.
> > >> > > > >
> > >> > > > > With fixed start positions, no special switching interface to
> > >> > transfer
> > >> > > > > information between enumerators is required. Sources are
> > configured
> > >> > as
> > >> > > > > they would be when used standalone and just plugged into
> > >> > HybridSource.
> > >> > > > > I expect that to be a common use case. You can find an example
> > for
> > >> > > > > this in the ITCase:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > >> > > > >
> > >> > > > > For dynamic start position, the checkpoint state is used to
> > transfer
> > >> > > > > information from old to new enumerator. An example for that can
> > be
> > >> > > > > found here:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > >> > > > >
> > >> > > > > That may look verbose, but the code to convert from one state to
> > >> > > > > another can be factored out into a utility and the function
> > becomes a
> > >> > > > > one-liner.
> > >> > > > >
> > >> > > > > For common sources like files and Kafka we can potentially
> > (later)
> > >> > > > > implement the conversion logic as part of the respective
> > connector's
> > >> > > > > checkpoint and split classes.
> > >> > > > >
> > >> > > > > I hope that with the PR up for review, we can soon reach a
> > conclusion
> > >> > > > > on how we want to expose this to the user.
> > >> > > > >
> > >> > > > > Following is an example for Files -> Files -> Kafka that I'm
> > using
> > >> > for
> > >> > > > > e2e testing. It exercises both ways of setting the start
> > position.
> > >> > > > >
> > >> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > >> > > > >
> > >> > > > >
> > >> > > > > 2. Regarding the events used to implement the actual switch
> > between
> > >> > > > > enumerator and readers: I updated the PR with javadoc to
> > clarify the
> > >> > > > > intent. Please let me know if that helps or let's continue to
> > discuss
> > >> > > > > those details on the PR?
> > >> > > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Thomas
> > >> > > > >
> > >> > > > >
> > >> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> > [hidden email]>
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > Hi Thomas,
> > >> > > > > >
> > >> > > > > >    Sorry for later reply for your POC. I have reviewed the
> > based
> > >> > > abstract
> > >> > > > > > implementation of your pull request:
> > >> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> > switching
> > >> > > > > > mechanism, this level of abstraction is not concise enough,
> > which
> > >> > > doesn't
> > >> > > > > > make connector contribution easier. In theory, it is
> > necessary to
> > >> > > > > introduce
> > >> > > > > > a set of interfaces to support the switching mechanism. The
> > >> > > > > SwitchableSource
> > >> > > > > > and SwitchableSplitEnumerator interfaces are needed for
> > connector
> > >> > > > > > expansibility.
> > >> > > > > >    In other words, the whole switching process of above
> > mentioned
> > >> > PR
> > >> > > is
> > >> > > > > > different from that mentioned in FLIP-150. In the above
> > >> > > implementation,
> > >> > > > > the
> > >> > > > > > source reading switching is executed after receving the
> > >> > > > > SwitchSourceEvent,
> > >> > > > > > which could be before the sending SourceReaderFinishEvent.
> > This
> > >> > > timeline
> > >> > > > > of
> > >> > > > > > source reading switching could be discussed here.
> > >> > > > > >    @Stephan @Becket, if you are available, please help to
> > review
> > >> > the
> > >> > > > > > abstract implementation, and compare with the interfaces
> > mentioned
> > >> > in
> > >> > > > > > FLIP-150.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Nicholas Jiang
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Sent from:
> > >> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >> > > > >
> > >> > >
> > >> >
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
12