[DISCUSS]FLIP-150: Introduce Hybrid Source

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS]FLIP-150: Introduce Hybrid Source

Nicholas Jiang
This post was updated on .
Hi devs,

I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
source is a source that contains a list of concrete sources. The hybrid
source reads from each contained source in the defined order. It switches
from source A to the next source B when source A finishes.

In practice, many Flink jobs need to read data from multiple sources in
sequential order. Change Data Capture (CDC) and machine learning feature
backfill are two concrete scenarios of this consumption pattern. Users may
have to either run two different Flink jobs or have some hacks in the
SourceFunction to address such use cases.

To support above scenarios smoothly, the Flink jobs need to first read from
HDFS for historical data then switch to Kafka for real-time records. The
hybrid source has several benefits from the user's perspective:

- Switching among multiple sources is easy based on the switchable source
implementations of different connectors.
- This supports to automatically switching for user-defined switchable
source that constitutes hybrid source.
- There is complete and effective mechanism to support smooth source
migration between historical and real-time data.

Therefore, in this discussion, we propose to introduce a “Hybrid Source” API
built on top of the new Source API (FLIP-27) to help users to smoothly
switch sources. For more detail, please refer to the FLIP design doc[1].

I'm looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source

Best,
Nicholas Jiang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Aljoscha Krettek-2
Hi Nicholas,

Thanks for starting the discussion!

I think we might be able to simplify this a bit and re-use existing
functionality.

There is already `Source.restoreEnumerator()` and
`SplitEnumerator.snapshotState(). This seems to be roughly what the
Hybrid Source needs. When the initial source finishes, we can take a
snapshot (which should include data that the follow-up sources need for
initialization). Then we need a function that maps the enumerator
checkpoint types between initial source and new source and we are good
to go. We wouldn't need to introduce any additional interfaces for
sources to implement, which would fragment the ecosystem between sources
that can be used in a Hybrid Source and sources that cannot be used in a
Hybrid Source.

What do you think?

Best,
Aljoscha

On 2020/11/03 02:34, Nicholas Jiang wrote:

>Hi devs,
>
>I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
>source is a source that contains a list of concrete sources. The hybrid
>source reads from each contained source in the defined order. It switches
>from source A to the next source B when source A finishes.
>
>In practice, many Flink jobs need to read data from multiple sources in
>sequential order. Change Data Capture (CDC) and machine learning feature
>backfill are two concrete scenarios of this consumption pattern. Users may
>have to either run two different Flink jobs or have some hacks in the
>SourceFunction to address such use cases.
>
>To support above scenarios smoothly, the Flink jobs need to first read from
>HDFS for historical data then switch to Kafka for real-time records. The
>hybrid source has several benefits from the user's perspective:
>
>- Switching among multiple sources is easy based on the switchable source
>implementations of different connectors.
>- This supports to automatically switching for user-defined switchable
>source that constitutes hybrid source.
>- There is complete and effective mechanism to support smooth source
>migration between historical and real-time data.
>
>Therefore, in this discussion, we propose to introduce a “Hybrid Source” API
>built on top of the new Source API (FLIP-27) to help users to smoothly
>switch sources. For more detail, please refer to the FLIP design doc[1].
>
>I'm looking forward to your feedback.
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
><https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source>
>
>Best,
>Nicholas Jiang
>
>
>
>--
>Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Thanks for initiating this discussion and creating the proposal!

I would like to contribute to this effort. Has there been related activity
since the FLIP was created?

If not, I would like to start work on a PoC to validate the design.

Questions/comments:

There could be more use cases for a hybrid source beyond predefined
sequence that is fixed at job submission time. For example, the source
connector could be used to migrate from one external system to another
(like Kafka1 .. KafkaN - based on external trigger/discovery).

I agree with @Aljoscha Krettek <[hidden email]> that it would be
preferable to solve this without special "switchable" interfaces and have
it work with any FLIP-27 source as is. Performing the switch using the
enumerator checkpoint appears viable (not proven though unless coded 😉).
The actual FLIP-27 source reader would need to signal to the
"HybridSourceReader" (HSR) that they are done and then the HSR would send
the switch event to the coordinator?

To further confirm my understanding:

The actual split type that flows between enumerator and reader would be
"HybridSourceSplit" and it would wrap the specific split (in the example
either HDFS or Kafka)?

Switching relies on the previous source's end position to be communicated
as start position to the next source. The position(s) can be exchanged
through the checkpoint state, but "HybridSplitEnumerator" still needs a way
to extract them from the actual enumerator. That could be done by the
enumerator checkpoint state mapping function looking at the current split
assignments, which would not require modification of existing enumerators?

Cheers,
Thomas


On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <[hidden email]> wrote:

> Hi Nicholas,
>
> Thanks for starting the discussion!
>
> I think we might be able to simplify this a bit and re-use existing
> functionality.
>
> There is already `Source.restoreEnumerator()` and
> `SplitEnumerator.snapshotState(). This seems to be roughly what the
> Hybrid Source needs. When the initial source finishes, we can take a
> snapshot (which should include data that the follow-up sources need for
> initialization). Then we need a function that maps the enumerator
> checkpoint types between initial source and new source and we are good
> to go. We wouldn't need to introduce any additional interfaces for
> sources to implement, which would fragment the ecosystem between sources
> that can be used in a Hybrid Source and sources that cannot be used in a
> Hybrid Source.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 2020/11/03 02:34, Nicholas Jiang wrote:
> >Hi devs,
> >
> >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
> >source is a source that contains a list of concrete sources. The hybrid
> >source reads from each contained source in the defined order. It switches
> >from source A to the next source B when source A finishes.
> >
> >In practice, many Flink jobs need to read data from multiple sources in
> >sequential order. Change Data Capture (CDC) and machine learning feature
> >backfill are two concrete scenarios of this consumption pattern. Users may
> >have to either run two different Flink jobs or have some hacks in the
> >SourceFunction to address such use cases.
> >
> >To support above scenarios smoothly, the Flink jobs need to first read
> from
> >HDFS for historical data then switch to Kafka for real-time records. The
> >hybrid source has several benefits from the user's perspective:
> >
> >- Switching among multiple sources is easy based on the switchable source
> >implementations of different connectors.
> >- This supports to automatically switching for user-defined switchable
> >source that constitutes hybrid source.
> >- There is complete and effective mechanism to support smooth source
> >migration between historical and real-time data.
> >
> >Therefore, in this discussion, we propose to introduce a “Hybrid Source”
> API
> >built on top of the new Source API (FLIP-27) to help users to smoothly
> >switch sources. For more detail, please refer to the FLIP design doc[1].
> >
> >I'm looking forward to your feedback.
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> ><
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> >
> >
> >Best,
> >Nicholas Jiang
> >
> >
> >
> >--
> >Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Kezhu Wang
Hi all,

Thanks for starting the discussion!

I think there are differences between start-position and checkpoint-state.
Checkpoint states
are actually intermediate progress-capture in reading data. Source clients
don't care about it
in coding phase, there are purely implementation details. While
start-position, on the other side,
is part of public API, say, paths for FileSource/HiveSource,
OffsetsInitializer for KafkaSource.
Given that, `setStartState` is actually a reposition operation for next
source to start in job runtime.

Besides above, I think this separation of start-position and
checkpoint-state leads to smooth
migration from no-switchable source to switchable source and relaxes source
writers. Source writers
will not have to take into account of switchable-or-not in design phase,
but a postponable decision.
Later enhancement will not break anything. I would be relative hard to
convert start-position to
checkpoint-state in restoring without changing to checkpoint-state
structure and/or serializer:

* `Paths[]` to `PendingSplitsCheckpoint<FileSourceSplit>`.
* `OffsetsInitializer` to `KafkaSourceEnumState`.

This conversion should be implementation detail of next source, not
converter function in my opinion.

With separated start-position concept, I think the `StartStateT` in
proposal should be `Path[]` for
`FileSource`, `OffsetsInitializer` for `KafkaSource`. Seems like it should
belong to source more
than split enumerator.

If we do not reuse checkpoint-state as start-position, then this feature
requires supports from Flink
side(that says it is awkward/tangled to be used as a third party libraries):
* It needs a post-mortem position to delivery possible useful information
for next source.
* It needs to reposition next source, such as `FileSource`, `KafkaSource`,
etc.

Back to the proposal, I think it shows three joints:
1. Separated start-position for next source.
2. Separated end-position from preceding source.
3. Converter from end-position to start-position.

I think some or all of the three should be optional. Let me detail:
1. No configurable start-position. In this situation combination of above
three is a nop,
   and `HybridSource` is just a chain of start-position pre-configured
sources. Current design seems
   overkilling for this use case. This case could also be realized with
help from `InputSelectable`
   and `MultipleInputStreamOperator`, but I think it fits this feature
quite well.
2. I am wonder whether end-position is a must and how it could be useful
for end users in
   a generic-enough source, say `FileSource` ? I could imagine that some
sources may have embedded
   next-position for next source to start. For most generic sources, they
actually have no meaningful
   next-position for other sources. Then would it be too risky to coin this
to generic sources' type ?
   Or we are actually doing this solely to fit type requirement of
`HybridSource.addSource` ?
3. Is it possible for converter function to do blocking operations ? How to
respond to checkpoint
   request when switching split enumerators cross sources ? Does
end-position or start-position
   need to be stored in checkpoint state or not ?

Last, for the name `HybridSource`, would it be possible to use this feature
to switch/chain multiple
homogeneous sources ? Say:

* Two file sources with different formats.
* Migrate from one kafka cluster to another as @Thomas has already pointed
out.



On February 4, 2021 at 10:48:21, Thomas Weise ([hidden email]) wrote:

Thanks for initiating this discussion and creating the proposal!

I would like to contribute to this effort. Has there been related activity
since the FLIP was created?

If not, I would like to start work on a PoC to validate the design.

Questions/comments:

There could be more use cases for a hybrid source beyond predefined
sequence that is fixed at job submission time. For example, the source
connector could be used to migrate from one external system to another
(like Kafka1 .. KafkaN - based on external trigger/discovery).

I agree with @Aljoscha Krettek <[hidden email]> that it would be
preferable to solve this without special "switchable" interfaces and have
it work with any FLIP-27 source as is. Performing the switch using the
enumerator checkpoint appears viable (not proven though unless coded 😉).
The actual FLIP-27 source reader would need to signal to the
"HybridSourceReader" (HSR) that they are done and then the HSR would send
the switch event to the coordinator?

To further confirm my understanding:

The actual split type that flows between enumerator and reader would be
"HybridSourceSplit" and it would wrap the specific split (in the example
either HDFS or Kafka)?

Switching relies on the previous source's end position to be communicated
as start position to the next source. The position(s) can be exchanged
through the checkpoint state, but "HybridSplitEnumerator" still needs a way
to extract them from the actual enumerator. That could be done by the
enumerator checkpoint state mapping function looking at the current split
assignments, which would not require modification of existing enumerators?

Cheers,
Thomas


On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <[hidden email]>
wrote:

> Hi Nicholas,
>
> Thanks for starting the discussion!
>
> I think we might be able to simplify this a bit and re-use existing
> functionality.
>
> There is already `Source.restoreEnumerator()` and
> `SplitEnumerator.snapshotState(). This seems to be roughly what the
> Hybrid Source needs. When the initial source finishes, we can take a
> snapshot (which should include data that the follow-up sources need for
> initialization). Then we need a function that maps the enumerator
> checkpoint types between initial source and new source and we are good
> to go. We wouldn't need to introduce any additional interfaces for
> sources to implement, which would fragment the ecosystem between sources
> that can be used in a Hybrid Source and sources that cannot be used in a
> Hybrid Source.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 2020/11/03 02:34, Nicholas Jiang wrote:
> >Hi devs,
> >
> >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
> >source is a source that contains a list of concrete sources. The hybrid
> >source reads from each contained source in the defined order. It
switches
> >from source A to the next source B when source A finishes.
> >
> >In practice, many Flink jobs need to read data from multiple sources in
> >sequential order. Change Data Capture (CDC) and machine learning feature
> >backfill are two concrete scenarios of this consumption pattern. Users
may
> >have to either run two different Flink jobs or have some hacks in the
> >SourceFunction to address such use cases.
> >
> >To support above scenarios smoothly, the Flink jobs need to first read
> from
> >HDFS for historical data then switch to Kafka for real-time records. The
> >hybrid source has several benefits from the user's perspective:
> >
> >- Switching among multiple sources is easy based on the switchable
source

> >implementations of different connectors.
> >- This supports to automatically switching for user-defined switchable
> >source that constitutes hybrid source.
> >- There is complete and effective mechanism to support smooth source
> >migration between historical and real-time data.
> >
> >Therefore, in this discussion, we propose to introduce a “Hybrid Source”
> API
> >built on top of the new Source API (FLIP-27) to help users to smoothly
> >switch sources. For more detail, please refer to the FLIP design doc[1].
> >
> >I'm looking forward to your feedback.
> >
> >[1]
> >
>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> ><
>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source

> >
> >
> >Best,
> >Nicholas Jiang
> >
> >
> >
> >--
> >Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Nicholas Jiang
This post was updated on .
In reply to this post by Thomas Weise
Hi Thomas,

Thanks for your detailed reply for the design of Hybrid Source. I would
reply to the questions you mentioned as follows:

1.Has there been related activity since the FLIP was created?

Yes, the FLIP has the initial version of Hybrid Source implementation. You'd
better refer to the repository: git@github.com:wuchong/flink-hackathon.git .

2.The actual FLIP-27 source reader would need to signal to the
"HybridSourceReader" (HSR) that they are done and then the HSR would send
the switch event to the coordinator?

The communication in Hybrid Source should be between "HybridSourceReader"
and "HybridSplitEnumerator". After a "SourceReader" in "HybridSourceReader"
finishes reading the source split assigned from split enumerator and
switches to another "SourceReader",  "HybridSourceReader" send a signal to
"HybridSplitEnumerator" to notify that the "SourceReader" finishes reading.
Then "HybridSplitEnumerator" receives the "SourceReaderSwitchedEvent" signal
and switch to another "SplitEnumerator".

3.The actual split type that flows between enumerator and reader would be
"HybridSourceSplit" and it would wrap the specific split (in the example
either HDFS or Kafka)?

"HybridSourceSplit" wraps the source split types for the "SourceSplit"
created by the "SplitEnumerator" in "HybridSplitEnumerator".

4."HybridSplitEnumerator" still needs a way to extract them from the actual
enumerator. That could be done by the enumerator checkpoint state mapping
function looking at the current split assignments, which would not require
modification of existing enumerators?

IMO, the above way to extract them from the actual enumerator could be
defined in the state conversion function which is invoked in
"HybridSplitEnumerator" when switch one "SwitchableSplitEnumerator" to
another "SwitchableSplitEnumerator". Because the "SwitchableSplitEnumerator"
interface could return the end state of the actual enumerator.

I don't know if it has answered your confusion for Hybrid Source. If you
still have questions, keep discussion with me. Thanks for your attention.

Best,
Nicholas Jiang



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Nicholas Jiang
This post was updated on .
In reply to this post by Kezhu Wang
Hi Kezhu,

Thanks for your detailed points for the Hybrid Source. I follow your
opinions and make a corresponding explanation as follows:

1.Would the Hybrid Source be possible to use this feature to switch/chain
multiple homogeneous sources?

"HybridSource" supports to switch/chain multiple homogeneous sources, which
have the respective implementation for "SwitchableSource" and
"SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
Sources consisted is homogeneous. From the user's perspective, User only
adds the "SwitchableSource" into "HybridSource" and leaves the smooth
migration operation to "HybridSource".

2."setStartState" is actually a reposition operation for next source to
start in job runtime?

IMO, "setStartState" is used to determine the initial position of the new
source for smooth migration, not reposition operation. More importantly, the
"State" mentioned here refers to the start and end positions of reading
source.

3.This conversion should be implementation detail of next source, not
converter function in my opinion?

The state conversion is of course an implementation detail and included in
the switching mechanism, that should provide users with the conversion
interface for conversion, which is defined in converter function. What's
more, when users has already implemented "SwitchableSource" and added to the
Hybrid Source, the users don't need to implement the "SwitchableSource" for
the different conversion. From the user's perspective, users could define
the different converter functions and create the "SwitchableSource" for the
addition of "HybridSource", no need to implement a Source for the converter
function.

4.No configurable start-position. In this situation combination of above
three joints is a nop, and
"HybridSource" is a chain of start-position pre-configured sources?

Indeed there is no configurable start-position, and this configuration could
be involved in the feature. Users could use
"SwitchableSplitEnumerator#setStartState" interface or the configuration
parameters to configure start-position.

5.I am wonder whether end-position is a must and how it could be useful for
end users in a generic-enough source?

"getEndState" interface is used for the smooth migration scenario, which
could return null value if it is not needed. In the Hybrid Source mechanism,
this interface is required for the switching between the sources consisted,
otherwise there is no any way to get end-position of upstream source. In
summary, Hybrid Source needs to be able to set the start position and get
the end position of each Source, otherwise there is no use to build Hybrid
Source.

6.Is it possible for converter function to do blocking operations? How to
respond to checkpoint request when switching split enumerators cross
sources? Does end-position or start-position need to be stored in checkpoint
state or not?

The converter function only simply converts the state of upstream source to
the state of downstream source, not blocking operations. The way to respond
the checkpoint request when switching split enumerators cross sources is
send the corresponding "SourceEvent" to coordination. The end-position or
start-position don't need to be stored in checkpoint state, only implements
the "getEndState" interface for end-position.

Best,
Nicholas Jiang



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Nicholas,

Thanks for the reply. I had implemented a small PoC. It switches a
configurable sequence of sources with predefined bounds. I'm using the
unmodified MockSource for illustration. It does not require a "Switchable"
interface. I looked at the code you shared and the delegation and signaling
works quite similar. That's a good validation.

Hi Kezhu,

Thanks for bringing the more detailed discussion regarding the start/end
position. I think in most cases the start and end positions will be known
when the job is submitted. If we take a File -> Kafka source chain as
example, there would most likely be a timestamp at which we want to
transition from files to reading from Kafka. So we would either set the
start position for Kafka based on that timestamp or provide the offsets
directly. (Note that I'm skipping a few related nuances here. In order to
achieve an exact switch without duplication or gap, we may also need some
overlap and filtering, but that's a separate issue.)

The point is that the start positions can be configured by the user, there
is no need to transfer any information from one source to another as part
of switching.

It gets more complicated if we want to achieve a dynamic switch where the
transition timestamp isn't known when the job starts. For example, consider
a bootstrap scenario where the time taken to process historic data exceeds
the Kafka retention. Here, we would need to dynamically resolve the Kafka
start position based on where the file readers left off, when the switching
occurs. The file source enumerator would determine at runtime when it is
done handing splits to its readers, maybe when the max file timestamp
reaches (processing time - X). This information needs to be transferred to
the Kafka source.

The timestamp would need to be derived from the file enumerator state,
either by looking at the last splits or explicitly. The natural way to do
that is to introspect the enumerator state which gets checkpointed. Any
other form of "end position" via a special interface would need to be
derived in the same manner.

The converter that will be provided by the user would look at the file
enumerator state, derive the timestamp and then supply the "start position"
to the Kafka source. The Kafka source was created when the job started. It
needs to be augmented with the new start position. That can be achieved via
a special enumerator interface like SwitchableSplitEnumerator#setStartState
or by using restoreEnumerator with the checkpoint state constructed by the
converter function. I'm leaning towards the latter as long as there is a
convenient way to construct the state from a position (like
enumStateForTimestamp). The converter would map one enum state to another
and can be made very simple by providing a few utility functions instead of
mandating a new interface that enumerators need to implement to become
switchable.

Again, a converter is only required when sources need to be switched based
on positions not known at graph construction time.

I'm planning to add such deferred switching to the PoC for illustration and
will share the experiment when that's done.

Cheers,
Thomas


On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> wrote:

> Hi Kezhu,
>
> Thanks for your detailed points for the Hybrid Source. I follow your
> opinions and make a corresponding explanation as follows:
>
> 1.Would the Hybrid Source be possible to use this feature to switch/chain
> multiple homogeneous sources?
>
> "HybridSource" supports to switch/chain multiple homogeneous sources, which
> have the respective implementation for "SwitchableSource" and
> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> Sources consisted is homogeneous. From the user's perspective, User only
> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> migration operation to "HybridSource".
>
> 2."setStartState" is actually a reposition operation for next source to
> start in job runtime?
>
> IMO, "setStartState" is used to determine the initial position of the new
> source for smooth migration, not reposition operation. More importantly,
> the
> "State" mentioned here refers to the start and end positions of reading
> source.
>
> 3.This conversion should be implementation detail of next source, not
> converter function in my opinion?
>
> The state conversion is of course an implementation detail and included in
> the switching mechanism, that should provide users with the conversion
> interface for conversion, which is defined in converter function. What's
> more, when users has already implemented "SwitchableSource" and added to
> the
> Hybrid Source, the users don't need to implement the "SwitchableSource" for
> the different conversion. From the user's perspective, users could define
> the different converter functions and create the "SwitchableSource" for the
> addition of "HybridSource", no need to implement a Source for the converter
> function.
>
> 4.No configurable start-position. In this situation combination of above
> three joints is a nop, and
> "HybridSource" is a chain of start-position pre-configured sources?
>
> Indeed there is no configurable start-position, and this configuration
> could
> be involved in the feature. Users could use
> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> parameters to configure start-position.
>
> 5.I am wonder whether end-position is a must and how it could be useful for
> end users in a generic-enough source?
>
> "getEndState" interface is used for the smooth migration scenario, which
> could return null value if it is not needed. In the Hybrid Source
> mechanism,
> this interface is required for the switching between the sources consisted,
> otherwise there is no any way to get end-position of upstream source. In
> summary, Hybrid Source needs to be able to set the start position and get
> the end position of each Source, otherwise there is no use to build Hybrid
> Source.
>
> 6.Is it possible for converter function to do blocking operations? How to
> respond to checkpoint request when switching split enumerators cross
> sources? Does end-position or start-position need to be stored in
> checkpoint
> state or not?
>
> The converter function only simply converts the state of upstream source to
> the state of downstream source, not blocking operations. The way to respond
> the checkpoint request when switching split enumerators cross sources is
> send the corresponding "SourceEvent" to coordination. The end-position or
> start-position don't need to be stored in checkpoint state, only implements
> the "getEndState" interface for end-position.
>
> Best,
> Nicholas Jiang
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi,

As mentioned in my previous email, I had been working on a prototype for
the hybrid source.

You can find it at https://github.com/tweise/flink/pull/1

It contains:
* Switching with configurable chain of sources
* Fixed or dynamic start positions
* Test with MockSource and FileSource

The purpose of the above PR is to gather feedback and help drive consensus
on the FLIP.

* How to support a dynamic start position within the source chain?

Relevant in those (few?) cases where start positions are not known upfront.
You can find an example of what that might look like in the tests:

https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132

When switching, the enumerator of the previous source needs to
supply information about consumed splits that allows to set the start
position for the next source. That could be something like the last
processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
doesn't track finished splits.)

See previous discussion regarding start/end position. The prototype shows
the use of checkpoint state with converter function.

* Should readers be deployed dynamically?

The prototype assumes a static source chain that is fixed at job submission
time. Conceivably there could be use cases that require more flexibility.
Such as switching one KafkaSource for another. A step in that direction
would be to deploy the actual readers dynamically, at the time of switching
source.

Looking forward to feedback and suggestions for next steps!

Thomas

On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote:

> Hi Nicholas,
>
> Thanks for the reply. I had implemented a small PoC. It switches a
> configurable sequence of sources with predefined bounds. I'm using the
> unmodified MockSource for illustration. It does not require a "Switchable"
> interface. I looked at the code you shared and the delegation and signaling
> works quite similar. That's a good validation.
>
> Hi Kezhu,
>
> Thanks for bringing the more detailed discussion regarding the start/end
> position. I think in most cases the start and end positions will be known
> when the job is submitted. If we take a File -> Kafka source chain as
> example, there would most likely be a timestamp at which we want to
> transition from files to reading from Kafka. So we would either set the
> start position for Kafka based on that timestamp or provide the offsets
> directly. (Note that I'm skipping a few related nuances here. In order to
> achieve an exact switch without duplication or gap, we may also need some
> overlap and filtering, but that's a separate issue.)
>
> The point is that the start positions can be configured by the user, there
> is no need to transfer any information from one source to another as part
> of switching.
>
> It gets more complicated if we want to achieve a dynamic switch where the
> transition timestamp isn't known when the job starts. For example, consider
> a bootstrap scenario where the time taken to process historic data exceeds
> the Kafka retention. Here, we would need to dynamically resolve the Kafka
> start position based on where the file readers left off, when the switching
> occurs. The file source enumerator would determine at runtime when it is
> done handing splits to its readers, maybe when the max file timestamp
> reaches (processing time - X). This information needs to be transferred to
> the Kafka source.
>
> The timestamp would need to be derived from the file enumerator state,
> either by looking at the last splits or explicitly. The natural way to do
> that is to introspect the enumerator state which gets checkpointed. Any
> other form of "end position" via a special interface would need to be
> derived in the same manner.
>
> The converter that will be provided by the user would look at the file
> enumerator state, derive the timestamp and then supply the "start position"
> to the Kafka source. The Kafka source was created when the job started. It
> needs to be augmented with the new start position. That can be achieved via
> a special enumerator interface like SwitchableSplitEnumerator#setStartState
> or by using restoreEnumerator with the checkpoint state constructed by the
> converter function. I'm leaning towards the latter as long as there is a
> convenient way to construct the state from a position (like
> enumStateForTimestamp). The converter would map one enum state to another
> and can be made very simple by providing a few utility functions instead of
> mandating a new interface that enumerators need to implement to become
> switchable.
>
> Again, a converter is only required when sources need to be switched based
> on positions not known at graph construction time.
>
> I'm planning to add such deferred switching to the PoC for illustration
> and will share the experiment when that's done.
>
> Cheers,
> Thomas
>
>
> On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> wrote:
>
>> Hi Kezhu,
>>
>> Thanks for your detailed points for the Hybrid Source. I follow your
>> opinions and make a corresponding explanation as follows:
>>
>> 1.Would the Hybrid Source be possible to use this feature to switch/chain
>> multiple homogeneous sources?
>>
>> "HybridSource" supports to switch/chain multiple homogeneous sources,
>> which
>> have the respective implementation for "SwitchableSource" and
>> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
>> Sources consisted is homogeneous. From the user's perspective, User only
>> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
>> migration operation to "HybridSource".
>>
>> 2."setStartState" is actually a reposition operation for next source to
>> start in job runtime?
>>
>> IMO, "setStartState" is used to determine the initial position of the new
>> source for smooth migration, not reposition operation. More importantly,
>> the
>> "State" mentioned here refers to the start and end positions of reading
>> source.
>>
>> 3.This conversion should be implementation detail of next source, not
>> converter function in my opinion?
>>
>> The state conversion is of course an implementation detail and included in
>> the switching mechanism, that should provide users with the conversion
>> interface for conversion, which is defined in converter function. What's
>> more, when users has already implemented "SwitchableSource" and added to
>> the
>> Hybrid Source, the users don't need to implement the "SwitchableSource"
>> for
>> the different conversion. From the user's perspective, users could define
>> the different converter functions and create the "SwitchableSource" for
>> the
>> addition of "HybridSource", no need to implement a Source for the
>> converter
>> function.
>>
>> 4.No configurable start-position. In this situation combination of above
>> three joints is a nop, and
>> "HybridSource" is a chain of start-position pre-configured sources?
>>
>> Indeed there is no configurable start-position, and this configuration
>> could
>> be involved in the feature. Users could use
>> "SwitchableSplitEnumerator#setStartState" interface or the configuration
>> parameters to configure start-position.
>>
>> 5.I am wonder whether end-position is a must and how it could be useful
>> for
>> end users in a generic-enough source?
>>
>> "getEndState" interface is used for the smooth migration scenario, which
>> could return null value if it is not needed. In the Hybrid Source
>> mechanism,
>> this interface is required for the switching between the sources
>> consisted,
>> otherwise there is no any way to get end-position of upstream source. In
>> summary, Hybrid Source needs to be able to set the start position and get
>> the end position of each Source, otherwise there is no use to build Hybrid
>> Source.
>>
>> 6.Is it possible for converter function to do blocking operations? How to
>> respond to checkpoint request when switching split enumerators cross
>> sources? Does end-position or start-position need to be stored in
>> checkpoint
>> state or not?
>>
>> The converter function only simply converts the state of upstream source
>> to
>> the state of downstream source, not blocking operations. The way to
>> respond
>> the checkpoint request when switching split enumerators cross sources is
>> send the corresponding "SourceEvent" to coordination. The end-position or
>> start-position don't need to be stored in checkpoint state, only
>> implements
>> the "getEndState" interface for end-position.
>>
>> Best,
>> Nicholas Jiang
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Stephan Ewen
Thanks all for this discussion. Looks like there are lots of ideas and
folks that are eager to do things, so let's see how we can get this moving.

My take on this is the following:

There will probably not be one Hybrid source, but possibly multiple ones,
because of different strategies/requirements.
    - One may be very simple, with switching points known up-front. Would
be good to have this in a very simple implementation.
    - There may be one where the switch is dynamic and the readers need to
report back where they left off.
    - There may be one that switches back and forth multiple times during a
job, for example Kakfa going to DFS whenever it falls behind retention, in
order to catch up again.

This also seems hard to "design on paper"; I expect there are nuances in a
production setup that affect some details of the design. So I'd feel most
comfortable in adding a variant of the hybrid source to Flink that has been
used already in a real use case (not necessarily in production, but maybe
in a testing/staging environment, so it seems to meet all requirements).


What do you think about the following approach?
  - If there is a tested PoC, let's try to get it contributed to Flink
without trying to make it much more general.
  - When we see similar but a bit different requirements for another hybrid
source, then let's try to evolve the contributed one.
  - If we see new requirements that are so different that they don't fit
well with the existing hybrid source, then let us look at building a second
hybrid source for those requirements.

We need to make connector contributions in general more easy, and I think
it is not a bad thing to end up with different approaches and see how these
play out against each other when being used by users. For example switching
with known boundaries, dynamic switching, back-and-forth-switching, etc.
(I know some committers are planning to do some work on making
connector contributions easier, with standardized testing frameworks,
decoupled CI, etc.)

Best,
Stephan


On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote:

> Hi,
>
> As mentioned in my previous email, I had been working on a prototype for
> the hybrid source.
>
> You can find it at https://github.com/tweise/flink/pull/1
>
> It contains:
> * Switching with configurable chain of sources
> * Fixed or dynamic start positions
> * Test with MockSource and FileSource
>
> The purpose of the above PR is to gather feedback and help drive consensus
> on the FLIP.
>
> * How to support a dynamic start position within the source chain?
>
> Relevant in those (few?) cases where start positions are not known upfront.
> You can find an example of what that might look like in the tests:
>
>
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
>
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
>
> When switching, the enumerator of the previous source needs to
> supply information about consumed splits that allows to set the start
> position for the next source. That could be something like the last
> processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> doesn't track finished splits.)
>
> See previous discussion regarding start/end position. The prototype shows
> the use of checkpoint state with converter function.
>
> * Should readers be deployed dynamically?
>
> The prototype assumes a static source chain that is fixed at job submission
> time. Conceivably there could be use cases that require more flexibility.
> Such as switching one KafkaSource for another. A step in that direction
> would be to deploy the actual readers dynamically, at the time of switching
> source.
>
> Looking forward to feedback and suggestions for next steps!
>
> Thomas
>
> On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote:
>
> > Hi Nicholas,
> >
> > Thanks for the reply. I had implemented a small PoC. It switches a
> > configurable sequence of sources with predefined bounds. I'm using the
> > unmodified MockSource for illustration. It does not require a
> "Switchable"
> > interface. I looked at the code you shared and the delegation and
> signaling
> > works quite similar. That's a good validation.
> >
> > Hi Kezhu,
> >
> > Thanks for bringing the more detailed discussion regarding the start/end
> > position. I think in most cases the start and end positions will be known
> > when the job is submitted. If we take a File -> Kafka source chain as
> > example, there would most likely be a timestamp at which we want to
> > transition from files to reading from Kafka. So we would either set the
> > start position for Kafka based on that timestamp or provide the offsets
> > directly. (Note that I'm skipping a few related nuances here. In order to
> > achieve an exact switch without duplication or gap, we may also need some
> > overlap and filtering, but that's a separate issue.)
> >
> > The point is that the start positions can be configured by the user,
> there
> > is no need to transfer any information from one source to another as part
> > of switching.
> >
> > It gets more complicated if we want to achieve a dynamic switch where the
> > transition timestamp isn't known when the job starts. For example,
> consider
> > a bootstrap scenario where the time taken to process historic data
> exceeds
> > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > start position based on where the file readers left off, when the
> switching
> > occurs. The file source enumerator would determine at runtime when it is
> > done handing splits to its readers, maybe when the max file timestamp
> > reaches (processing time - X). This information needs to be transferred
> to
> > the Kafka source.
> >
> > The timestamp would need to be derived from the file enumerator state,
> > either by looking at the last splits or explicitly. The natural way to do
> > that is to introspect the enumerator state which gets checkpointed. Any
> > other form of "end position" via a special interface would need to be
> > derived in the same manner.
> >
> > The converter that will be provided by the user would look at the file
> > enumerator state, derive the timestamp and then supply the "start
> position"
> > to the Kafka source. The Kafka source was created when the job started.
> It
> > needs to be augmented with the new start position. That can be achieved
> via
> > a special enumerator interface like
> SwitchableSplitEnumerator#setStartState
> > or by using restoreEnumerator with the checkpoint state constructed by
> the
> > converter function. I'm leaning towards the latter as long as there is a
> > convenient way to construct the state from a position (like
> > enumStateForTimestamp). The converter would map one enum state to another
> > and can be made very simple by providing a few utility functions instead
> of
> > mandating a new interface that enumerators need to implement to become
> > switchable.
> >
> > Again, a converter is only required when sources need to be switched
> based
> > on positions not known at graph construction time.
> >
> > I'm planning to add such deferred switching to the PoC for illustration
> > and will share the experiment when that's done.
> >
> > Cheers,
> > Thomas
> >
> >
> > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]>
> wrote:
> >
> >> Hi Kezhu,
> >>
> >> Thanks for your detailed points for the Hybrid Source. I follow your
> >> opinions and make a corresponding explanation as follows:
> >>
> >> 1.Would the Hybrid Source be possible to use this feature to
> switch/chain
> >> multiple homogeneous sources?
> >>
> >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> >> which
> >> have the respective implementation for "SwitchableSource" and
> >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> >> Sources consisted is homogeneous. From the user's perspective, User only
> >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> >> migration operation to "HybridSource".
> >>
> >> 2."setStartState" is actually a reposition operation for next source to
> >> start in job runtime?
> >>
> >> IMO, "setStartState" is used to determine the initial position of the
> new
> >> source for smooth migration, not reposition operation. More importantly,
> >> the
> >> "State" mentioned here refers to the start and end positions of reading
> >> source.
> >>
> >> 3.This conversion should be implementation detail of next source, not
> >> converter function in my opinion?
> >>
> >> The state conversion is of course an implementation detail and included
> in
> >> the switching mechanism, that should provide users with the conversion
> >> interface for conversion, which is defined in converter function. What's
> >> more, when users has already implemented "SwitchableSource" and added to
> >> the
> >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> >> for
> >> the different conversion. From the user's perspective, users could
> define
> >> the different converter functions and create the "SwitchableSource" for
> >> the
> >> addition of "HybridSource", no need to implement a Source for the
> >> converter
> >> function.
> >>
> >> 4.No configurable start-position. In this situation combination of above
> >> three joints is a nop, and
> >> "HybridSource" is a chain of start-position pre-configured sources?
> >>
> >> Indeed there is no configurable start-position, and this configuration
> >> could
> >> be involved in the feature. Users could use
> >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> >> parameters to configure start-position.
> >>
> >> 5.I am wonder whether end-position is a must and how it could be useful
> >> for
> >> end users in a generic-enough source?
> >>
> >> "getEndState" interface is used for the smooth migration scenario, which
> >> could return null value if it is not needed. In the Hybrid Source
> >> mechanism,
> >> this interface is required for the switching between the sources
> >> consisted,
> >> otherwise there is no any way to get end-position of upstream source. In
> >> summary, Hybrid Source needs to be able to set the start position and
> get
> >> the end position of each Source, otherwise there is no use to build
> Hybrid
> >> Source.
> >>
> >> 6.Is it possible for converter function to do blocking operations? How
> to
> >> respond to checkpoint request when switching split enumerators cross
> >> sources? Does end-position or start-position need to be stored in
> >> checkpoint
> >> state or not?
> >>
> >> The converter function only simply converts the state of upstream source
> >> to
> >> the state of downstream source, not blocking operations. The way to
> >> respond
> >> the checkpoint request when switching split enumerators cross sources is
> >> send the corresponding "SourceEvent" to coordination. The end-position
> or
> >> start-position don't need to be stored in checkpoint state, only
> >> implements
> >> the "getEndState" interface for end-position.
> >>
> >> Best,
> >> Nicholas Jiang
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Stephan,

Thanks for the feedback!

I agree with the approach of starting with a simple implementation
that can address a well understood, significant portion of use cases.

I'm planning to continue work on the prototype that I had shared.
There is production level usage waiting for it fairly soon. I expect
to open a PR in the coming weeks.

Thomas





On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote:

>
> Thanks all for this discussion. Looks like there are lots of ideas and
> folks that are eager to do things, so let's see how we can get this moving.
>
> My take on this is the following:
>
> There will probably not be one Hybrid source, but possibly multiple ones,
> because of different strategies/requirements.
>     - One may be very simple, with switching points known up-front. Would
> be good to have this in a very simple implementation.
>     - There may be one where the switch is dynamic and the readers need to
> report back where they left off.
>     - There may be one that switches back and forth multiple times during a
> job, for example Kakfa going to DFS whenever it falls behind retention, in
> order to catch up again.
>
> This also seems hard to "design on paper"; I expect there are nuances in a
> production setup that affect some details of the design. So I'd feel most
> comfortable in adding a variant of the hybrid source to Flink that has been
> used already in a real use case (not necessarily in production, but maybe
> in a testing/staging environment, so it seems to meet all requirements).
>
>
> What do you think about the following approach?
>   - If there is a tested PoC, let's try to get it contributed to Flink
> without trying to make it much more general.
>   - When we see similar but a bit different requirements for another hybrid
> source, then let's try to evolve the contributed one.
>   - If we see new requirements that are so different that they don't fit
> well with the existing hybrid source, then let us look at building a second
> hybrid source for those requirements.
>
> We need to make connector contributions in general more easy, and I think
> it is not a bad thing to end up with different approaches and see how these
> play out against each other when being used by users. For example switching
> with known boundaries, dynamic switching, back-and-forth-switching, etc.
> (I know some committers are planning to do some work on making
> connector contributions easier, with standardized testing frameworks,
> decoupled CI, etc.)
>
> Best,
> Stephan
>
>
> On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote:
>
> > Hi,
> >
> > As mentioned in my previous email, I had been working on a prototype for
> > the hybrid source.
> >
> > You can find it at https://github.com/tweise/flink/pull/1
> >
> > It contains:
> > * Switching with configurable chain of sources
> > * Fixed or dynamic start positions
> > * Test with MockSource and FileSource
> >
> > The purpose of the above PR is to gather feedback and help drive consensus
> > on the FLIP.
> >
> > * How to support a dynamic start position within the source chain?
> >
> > Relevant in those (few?) cases where start positions are not known upfront.
> > You can find an example of what that might look like in the tests:
> >
> >
> > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> >
> > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> >
> > When switching, the enumerator of the previous source needs to
> > supply information about consumed splits that allows to set the start
> > position for the next source. That could be something like the last
> > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > doesn't track finished splits.)
> >
> > See previous discussion regarding start/end position. The prototype shows
> > the use of checkpoint state with converter function.
> >
> > * Should readers be deployed dynamically?
> >
> > The prototype assumes a static source chain that is fixed at job submission
> > time. Conceivably there could be use cases that require more flexibility.
> > Such as switching one KafkaSource for another. A step in that direction
> > would be to deploy the actual readers dynamically, at the time of switching
> > source.
> >
> > Looking forward to feedback and suggestions for next steps!
> >
> > Thomas
> >
> > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > configurable sequence of sources with predefined bounds. I'm using the
> > > unmodified MockSource for illustration. It does not require a
> > "Switchable"
> > > interface. I looked at the code you shared and the delegation and
> > signaling
> > > works quite similar. That's a good validation.
> > >
> > > Hi Kezhu,
> > >
> > > Thanks for bringing the more detailed discussion regarding the start/end
> > > position. I think in most cases the start and end positions will be known
> > > when the job is submitted. If we take a File -> Kafka source chain as
> > > example, there would most likely be a timestamp at which we want to
> > > transition from files to reading from Kafka. So we would either set the
> > > start position for Kafka based on that timestamp or provide the offsets
> > > directly. (Note that I'm skipping a few related nuances here. In order to
> > > achieve an exact switch without duplication or gap, we may also need some
> > > overlap and filtering, but that's a separate issue.)
> > >
> > > The point is that the start positions can be configured by the user,
> > there
> > > is no need to transfer any information from one source to another as part
> > > of switching.
> > >
> > > It gets more complicated if we want to achieve a dynamic switch where the
> > > transition timestamp isn't known when the job starts. For example,
> > consider
> > > a bootstrap scenario where the time taken to process historic data
> > exceeds
> > > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > > start position based on where the file readers left off, when the
> > switching
> > > occurs. The file source enumerator would determine at runtime when it is
> > > done handing splits to its readers, maybe when the max file timestamp
> > > reaches (processing time - X). This information needs to be transferred
> > to
> > > the Kafka source.
> > >
> > > The timestamp would need to be derived from the file enumerator state,
> > > either by looking at the last splits or explicitly. The natural way to do
> > > that is to introspect the enumerator state which gets checkpointed. Any
> > > other form of "end position" via a special interface would need to be
> > > derived in the same manner.
> > >
> > > The converter that will be provided by the user would look at the file
> > > enumerator state, derive the timestamp and then supply the "start
> > position"
> > > to the Kafka source. The Kafka source was created when the job started.
> > It
> > > needs to be augmented with the new start position. That can be achieved
> > via
> > > a special enumerator interface like
> > SwitchableSplitEnumerator#setStartState
> > > or by using restoreEnumerator with the checkpoint state constructed by
> > the
> > > converter function. I'm leaning towards the latter as long as there is a
> > > convenient way to construct the state from a position (like
> > > enumStateForTimestamp). The converter would map one enum state to another
> > > and can be made very simple by providing a few utility functions instead
> > of
> > > mandating a new interface that enumerators need to implement to become
> > > switchable.
> > >
> > > Again, a converter is only required when sources need to be switched
> > based
> > > on positions not known at graph construction time.
> > >
> > > I'm planning to add such deferred switching to the PoC for illustration
> > > and will share the experiment when that's done.
> > >
> > > Cheers,
> > > Thomas
> > >
> > >
> > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]>
> > wrote:
> > >
> > >> Hi Kezhu,
> > >>
> > >> Thanks for your detailed points for the Hybrid Source. I follow your
> > >> opinions and make a corresponding explanation as follows:
> > >>
> > >> 1.Would the Hybrid Source be possible to use this feature to
> > switch/chain
> > >> multiple homogeneous sources?
> > >>
> > >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> > >> which
> > >> have the respective implementation for "SwitchableSource" and
> > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> > >> Sources consisted is homogeneous. From the user's perspective, User only
> > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> > >> migration operation to "HybridSource".
> > >>
> > >> 2."setStartState" is actually a reposition operation for next source to
> > >> start in job runtime?
> > >>
> > >> IMO, "setStartState" is used to determine the initial position of the
> > new
> > >> source for smooth migration, not reposition operation. More importantly,
> > >> the
> > >> "State" mentioned here refers to the start and end positions of reading
> > >> source.
> > >>
> > >> 3.This conversion should be implementation detail of next source, not
> > >> converter function in my opinion?
> > >>
> > >> The state conversion is of course an implementation detail and included
> > in
> > >> the switching mechanism, that should provide users with the conversion
> > >> interface for conversion, which is defined in converter function. What's
> > >> more, when users has already implemented "SwitchableSource" and added to
> > >> the
> > >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> > >> for
> > >> the different conversion. From the user's perspective, users could
> > define
> > >> the different converter functions and create the "SwitchableSource" for
> > >> the
> > >> addition of "HybridSource", no need to implement a Source for the
> > >> converter
> > >> function.
> > >>
> > >> 4.No configurable start-position. In this situation combination of above
> > >> three joints is a nop, and
> > >> "HybridSource" is a chain of start-position pre-configured sources?
> > >>
> > >> Indeed there is no configurable start-position, and this configuration
> > >> could
> > >> be involved in the feature. Users could use
> > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> > >> parameters to configure start-position.
> > >>
> > >> 5.I am wonder whether end-position is a must and how it could be useful
> > >> for
> > >> end users in a generic-enough source?
> > >>
> > >> "getEndState" interface is used for the smooth migration scenario, which
> > >> could return null value if it is not needed. In the Hybrid Source
> > >> mechanism,
> > >> this interface is required for the switching between the sources
> > >> consisted,
> > >> otherwise there is no any way to get end-position of upstream source. In
> > >> summary, Hybrid Source needs to be able to set the start position and
> > get
> > >> the end position of each Source, otherwise there is no use to build
> > Hybrid
> > >> Source.
> > >>
> > >> 6.Is it possible for converter function to do blocking operations? How
> > to
> > >> respond to checkpoint request when switching split enumerators cross
> > >> sources? Does end-position or start-position need to be stored in
> > >> checkpoint
> > >> state or not?
> > >>
> > >> The converter function only simply converts the state of upstream source
> > >> to
> > >> the state of downstream source, not blocking operations. The way to
> > >> respond
> > >> the checkpoint request when switching split enumerators cross sources is
> > >> send the corresponding "SourceEvent" to coordination. The end-position
> > or
> > >> start-position don't need to be stored in checkpoint state, only
> > >> implements
> > >> the "getEndState" interface for end-position.
> > >>
> > >> Best,
> > >> Nicholas Jiang
> > >>
> > >>
> > >>
> > >> --
> > >> Sent from:
> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >>
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Stephan Ewen
Thanks, Thomas!

@Becket and @Nicholas - would you be ok with that approach?


On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote:

> Hi Stephan,
>
> Thanks for the feedback!
>
> I agree with the approach of starting with a simple implementation
> that can address a well understood, significant portion of use cases.
>
> I'm planning to continue work on the prototype that I had shared.
> There is production level usage waiting for it fairly soon. I expect
> to open a PR in the coming weeks.
>
> Thomas
>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote:
> >
> > Thanks all for this discussion. Looks like there are lots of ideas and
> > folks that are eager to do things, so let's see how we can get this
> moving.
> >
> > My take on this is the following:
> >
> > There will probably not be one Hybrid source, but possibly multiple ones,
> > because of different strategies/requirements.
> >     - One may be very simple, with switching points known up-front. Would
> > be good to have this in a very simple implementation.
> >     - There may be one where the switch is dynamic and the readers need
> to
> > report back where they left off.
> >     - There may be one that switches back and forth multiple times
> during a
> > job, for example Kakfa going to DFS whenever it falls behind retention,
> in
> > order to catch up again.
> >
> > This also seems hard to "design on paper"; I expect there are nuances in
> a
> > production setup that affect some details of the design. So I'd feel most
> > comfortable in adding a variant of the hybrid source to Flink that has
> been
> > used already in a real use case (not necessarily in production, but maybe
> > in a testing/staging environment, so it seems to meet all requirements).
> >
> >
> > What do you think about the following approach?
> >   - If there is a tested PoC, let's try to get it contributed to Flink
> > without trying to make it much more general.
> >   - When we see similar but a bit different requirements for another
> hybrid
> > source, then let's try to evolve the contributed one.
> >   - If we see new requirements that are so different that they don't fit
> > well with the existing hybrid source, then let us look at building a
> second
> > hybrid source for those requirements.
> >
> > We need to make connector contributions in general more easy, and I think
> > it is not a bad thing to end up with different approaches and see how
> these
> > play out against each other when being used by users. For example
> switching
> > with known boundaries, dynamic switching, back-and-forth-switching, etc.
> > (I know some committers are planning to do some work on making
> > connector contributions easier, with standardized testing frameworks,
> > decoupled CI, etc.)
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > As mentioned in my previous email, I had been working on a prototype
> for
> > > the hybrid source.
> > >
> > > You can find it at https://github.com/tweise/flink/pull/1
> > >
> > > It contains:
> > > * Switching with configurable chain of sources
> > > * Fixed or dynamic start positions
> > > * Test with MockSource and FileSource
> > >
> > > The purpose of the above PR is to gather feedback and help drive
> consensus
> > > on the FLIP.
> > >
> > > * How to support a dynamic start position within the source chain?
> > >
> > > Relevant in those (few?) cases where start positions are not known
> upfront.
> > > You can find an example of what that might look like in the tests:
> > >
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > >
> > > When switching, the enumerator of the previous source needs to
> > > supply information about consumed splits that allows to set the start
> > > position for the next source. That could be something like the last
> > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > doesn't track finished splits.)
> > >
> > > See previous discussion regarding start/end position. The prototype
> shows
> > > the use of checkpoint state with converter function.
> > >
> > > * Should readers be deployed dynamically?
> > >
> > > The prototype assumes a static source chain that is fixed at job
> submission
> > > time. Conceivably there could be use cases that require more
> flexibility.
> > > Such as switching one KafkaSource for another. A step in that direction
> > > would be to deploy the actual readers dynamically, at the time of
> switching
> > > source.
> > >
> > > Looking forward to feedback and suggestions for next steps!
> > >
> > > Thomas
> > >
> > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi Nicholas,
> > > >
> > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > configurable sequence of sources with predefined bounds. I'm using
> the
> > > > unmodified MockSource for illustration. It does not require a
> > > "Switchable"
> > > > interface. I looked at the code you shared and the delegation and
> > > signaling
> > > > works quite similar. That's a good validation.
> > > >
> > > > Hi Kezhu,
> > > >
> > > > Thanks for bringing the more detailed discussion regarding the
> start/end
> > > > position. I think in most cases the start and end positions will be
> known
> > > > when the job is submitted. If we take a File -> Kafka source chain as
> > > > example, there would most likely be a timestamp at which we want to
> > > > transition from files to reading from Kafka. So we would either set
> the
> > > > start position for Kafka based on that timestamp or provide the
> offsets
> > > > directly. (Note that I'm skipping a few related nuances here. In
> order to
> > > > achieve an exact switch without duplication or gap, we may also need
> some
> > > > overlap and filtering, but that's a separate issue.)
> > > >
> > > > The point is that the start positions can be configured by the user,
> > > there
> > > > is no need to transfer any information from one source to another as
> part
> > > > of switching.
> > > >
> > > > It gets more complicated if we want to achieve a dynamic switch
> where the
> > > > transition timestamp isn't known when the job starts. For example,
> > > consider
> > > > a bootstrap scenario where the time taken to process historic data
> > > exceeds
> > > > the Kafka retention. Here, we would need to dynamically resolve the
> Kafka
> > > > start position based on where the file readers left off, when the
> > > switching
> > > > occurs. The file source enumerator would determine at runtime when
> it is
> > > > done handing splits to its readers, maybe when the max file timestamp
> > > > reaches (processing time - X). This information needs to be
> transferred
> > > to
> > > > the Kafka source.
> > > >
> > > > The timestamp would need to be derived from the file enumerator
> state,
> > > > either by looking at the last splits or explicitly. The natural way
> to do
> > > > that is to introspect the enumerator state which gets checkpointed.
> Any
> > > > other form of "end position" via a special interface would need to be
> > > > derived in the same manner.
> > > >
> > > > The converter that will be provided by the user would look at the
> file
> > > > enumerator state, derive the timestamp and then supply the "start
> > > position"
> > > > to the Kafka source. The Kafka source was created when the job
> started.
> > > It
> > > > needs to be augmented with the new start position. That can be
> achieved
> > > via
> > > > a special enumerator interface like
> > > SwitchableSplitEnumerator#setStartState
> > > > or by using restoreEnumerator with the checkpoint state constructed
> by
> > > the
> > > > converter function. I'm leaning towards the latter as long as there
> is a
> > > > convenient way to construct the state from a position (like
> > > > enumStateForTimestamp). The converter would map one enum state to
> another
> > > > and can be made very simple by providing a few utility functions
> instead
> > > of
> > > > mandating a new interface that enumerators need to implement to
> become
> > > > switchable.
> > > >
> > > > Again, a converter is only required when sources need to be switched
> > > based
> > > > on positions not known at graph construction time.
> > > >
> > > > I'm planning to add such deferred switching to the PoC for
> illustration
> > > > and will share the experiment when that's done.
> > > >
> > > > Cheers,
> > > > Thomas
> > > >
> > > >
> > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]>
> > > wrote:
> > > >
> > > >> Hi Kezhu,
> > > >>
> > > >> Thanks for your detailed points for the Hybrid Source. I follow your
> > > >> opinions and make a corresponding explanation as follows:
> > > >>
> > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > switch/chain
> > > >> multiple homogeneous sources?
> > > >>
> > > >> "HybridSource" supports to switch/chain multiple homogeneous
> sources,
> > > >> which
> > > >> have the respective implementation for "SwitchableSource" and
> > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> the
> > > >> Sources consisted is homogeneous. From the user's perspective, User
> only
> > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> smooth
> > > >> migration operation to "HybridSource".
> > > >>
> > > >> 2."setStartState" is actually a reposition operation for next
> source to
> > > >> start in job runtime?
> > > >>
> > > >> IMO, "setStartState" is used to determine the initial position of
> the
> > > new
> > > >> source for smooth migration, not reposition operation. More
> importantly,
> > > >> the
> > > >> "State" mentioned here refers to the start and end positions of
> reading
> > > >> source.
> > > >>
> > > >> 3.This conversion should be implementation detail of next source,
> not
> > > >> converter function in my opinion?
> > > >>
> > > >> The state conversion is of course an implementation detail and
> included
> > > in
> > > >> the switching mechanism, that should provide users with the
> conversion
> > > >> interface for conversion, which is defined in converter function.
> What's
> > > >> more, when users has already implemented "SwitchableSource" and
> added to
> > > >> the
> > > >> Hybrid Source, the users don't need to implement the
> "SwitchableSource"
> > > >> for
> > > >> the different conversion. From the user's perspective, users could
> > > define
> > > >> the different converter functions and create the "SwitchableSource"
> for
> > > >> the
> > > >> addition of "HybridSource", no need to implement a Source for the
> > > >> converter
> > > >> function.
> > > >>
> > > >> 4.No configurable start-position. In this situation combination of
> above
> > > >> three joints is a nop, and
> > > >> "HybridSource" is a chain of start-position pre-configured sources?
> > > >>
> > > >> Indeed there is no configurable start-position, and this
> configuration
> > > >> could
> > > >> be involved in the feature. Users could use
> > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> configuration
> > > >> parameters to configure start-position.
> > > >>
> > > >> 5.I am wonder whether end-position is a must and how it could be
> useful
> > > >> for
> > > >> end users in a generic-enough source?
> > > >>
> > > >> "getEndState" interface is used for the smooth migration scenario,
> which
> > > >> could return null value if it is not needed. In the Hybrid Source
> > > >> mechanism,
> > > >> this interface is required for the switching between the sources
> > > >> consisted,
> > > >> otherwise there is no any way to get end-position of upstream
> source. In
> > > >> summary, Hybrid Source needs to be able to set the start position
> and
> > > get
> > > >> the end position of each Source, otherwise there is no use to build
> > > Hybrid
> > > >> Source.
> > > >>
> > > >> 6.Is it possible for converter function to do blocking operations?
> How
> > > to
> > > >> respond to checkpoint request when switching split enumerators cross
> > > >> sources? Does end-position or start-position need to be stored in
> > > >> checkpoint
> > > >> state or not?
> > > >>
> > > >> The converter function only simply converts the state of upstream
> source
> > > >> to
> > > >> the state of downstream source, not blocking operations. The way to
> > > >> respond
> > > >> the checkpoint request when switching split enumerators cross
> sources is
> > > >> send the corresponding "SourceEvent" to coordination. The
> end-position
> > > or
> > > >> start-position don't need to be stored in checkpoint state, only
> > > >> implements
> > > >> the "getEndState" interface for end-position.
> > > >>
> > > >> Best,
> > > >> Nicholas Jiang
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Sent from:
> > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >>
> > > >
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Becket Qin
Sorry for the late reply. Starting from a specific connector sounds
reasonable to me.

That said, I would suggest to keep the possibility of future generalization
as much as possible. We have already seen some variation of source
combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
Binlog, etc. So it would be good if we can reuse some base abstraction in
the future instead of having to write each combination from scratch.

Thanks,

Jiangjie (Becket) Qin

On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote:

> Thanks, Thomas!
>
> @Becket and @Nicholas - would you be ok with that approach?
>
>
> On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote:
>
> > Hi Stephan,
> >
> > Thanks for the feedback!
> >
> > I agree with the approach of starting with a simple implementation
> > that can address a well understood, significant portion of use cases.
> >
> > I'm planning to continue work on the prototype that I had shared.
> > There is production level usage waiting for it fairly soon. I expect
> > to open a PR in the coming weeks.
> >
> > Thomas
> >
> >
> >
> >
> >
> > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote:
> > >
> > > Thanks all for this discussion. Looks like there are lots of ideas and
> > > folks that are eager to do things, so let's see how we can get this
> > moving.
> > >
> > > My take on this is the following:
> > >
> > > There will probably not be one Hybrid source, but possibly multiple
> ones,
> > > because of different strategies/requirements.
> > >     - One may be very simple, with switching points known up-front.
> Would
> > > be good to have this in a very simple implementation.
> > >     - There may be one where the switch is dynamic and the readers need
> > to
> > > report back where they left off.
> > >     - There may be one that switches back and forth multiple times
> > during a
> > > job, for example Kakfa going to DFS whenever it falls behind retention,
> > in
> > > order to catch up again.
> > >
> > > This also seems hard to "design on paper"; I expect there are nuances
> in
> > a
> > > production setup that affect some details of the design. So I'd feel
> most
> > > comfortable in adding a variant of the hybrid source to Flink that has
> > been
> > > used already in a real use case (not necessarily in production, but
> maybe
> > > in a testing/staging environment, so it seems to meet all
> requirements).
> > >
> > >
> > > What do you think about the following approach?
> > >   - If there is a tested PoC, let's try to get it contributed to Flink
> > > without trying to make it much more general.
> > >   - When we see similar but a bit different requirements for another
> > hybrid
> > > source, then let's try to evolve the contributed one.
> > >   - If we see new requirements that are so different that they don't
> fit
> > > well with the existing hybrid source, then let us look at building a
> > second
> > > hybrid source for those requirements.
> > >
> > > We need to make connector contributions in general more easy, and I
> think
> > > it is not a bad thing to end up with different approaches and see how
> > these
> > > play out against each other when being used by users. For example
> > switching
> > > with known boundaries, dynamic switching, back-and-forth-switching,
> etc.
> > > (I know some committers are planning to do some work on making
> > > connector contributions easier, with standardized testing frameworks,
> > > decoupled CI, etc.)
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi,
> > > >
> > > > As mentioned in my previous email, I had been working on a prototype
> > for
> > > > the hybrid source.
> > > >
> > > > You can find it at https://github.com/tweise/flink/pull/1
> > > >
> > > > It contains:
> > > > * Switching with configurable chain of sources
> > > > * Fixed or dynamic start positions
> > > > * Test with MockSource and FileSource
> > > >
> > > > The purpose of the above PR is to gather feedback and help drive
> > consensus
> > > > on the FLIP.
> > > >
> > > > * How to support a dynamic start position within the source chain?
> > > >
> > > > Relevant in those (few?) cases where start positions are not known
> > upfront.
> > > > You can find an example of what that might look like in the tests:
> > > >
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > >
> > > > When switching, the enumerator of the previous source needs to
> > > > supply information about consumed splits that allows to set the start
> > > > position for the next source. That could be something like the last
> > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > > doesn't track finished splits.)
> > > >
> > > > See previous discussion regarding start/end position. The prototype
> > shows
> > > > the use of checkpoint state with converter function.
> > > >
> > > > * Should readers be deployed dynamically?
> > > >
> > > > The prototype assumes a static source chain that is fixed at job
> > submission
> > > > time. Conceivably there could be use cases that require more
> > flexibility.
> > > > Such as switching one KafkaSource for another. A step in that
> direction
> > > > would be to deploy the actual readers dynamically, at the time of
> > switching
> > > > source.
> > > >
> > > > Looking forward to feedback and suggestions for next steps!
> > > >
> > > > Thomas
> > > >
> > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]>
> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > > configurable sequence of sources with predefined bounds. I'm using
> > the
> > > > > unmodified MockSource for illustration. It does not require a
> > > > "Switchable"
> > > > > interface. I looked at the code you shared and the delegation and
> > > > signaling
> > > > > works quite similar. That's a good validation.
> > > > >
> > > > > Hi Kezhu,
> > > > >
> > > > > Thanks for bringing the more detailed discussion regarding the
> > start/end
> > > > > position. I think in most cases the start and end positions will be
> > known
> > > > > when the job is submitted. If we take a File -> Kafka source chain
> as
> > > > > example, there would most likely be a timestamp at which we want to
> > > > > transition from files to reading from Kafka. So we would either set
> > the
> > > > > start position for Kafka based on that timestamp or provide the
> > offsets
> > > > > directly. (Note that I'm skipping a few related nuances here. In
> > order to
> > > > > achieve an exact switch without duplication or gap, we may also
> need
> > some
> > > > > overlap and filtering, but that's a separate issue.)
> > > > >
> > > > > The point is that the start positions can be configured by the
> user,
> > > > there
> > > > > is no need to transfer any information from one source to another
> as
> > part
> > > > > of switching.
> > > > >
> > > > > It gets more complicated if we want to achieve a dynamic switch
> > where the
> > > > > transition timestamp isn't known when the job starts. For example,
> > > > consider
> > > > > a bootstrap scenario where the time taken to process historic data
> > > > exceeds
> > > > > the Kafka retention. Here, we would need to dynamically resolve the
> > Kafka
> > > > > start position based on where the file readers left off, when the
> > > > switching
> > > > > occurs. The file source enumerator would determine at runtime when
> > it is
> > > > > done handing splits to its readers, maybe when the max file
> timestamp
> > > > > reaches (processing time - X). This information needs to be
> > transferred
> > > > to
> > > > > the Kafka source.
> > > > >
> > > > > The timestamp would need to be derived from the file enumerator
> > state,
> > > > > either by looking at the last splits or explicitly. The natural way
> > to do
> > > > > that is to introspect the enumerator state which gets checkpointed.
> > Any
> > > > > other form of "end position" via a special interface would need to
> be
> > > > > derived in the same manner.
> > > > >
> > > > > The converter that will be provided by the user would look at the
> > file
> > > > > enumerator state, derive the timestamp and then supply the "start
> > > > position"
> > > > > to the Kafka source. The Kafka source was created when the job
> > started.
> > > > It
> > > > > needs to be augmented with the new start position. That can be
> > achieved
> > > > via
> > > > > a special enumerator interface like
> > > > SwitchableSplitEnumerator#setStartState
> > > > > or by using restoreEnumerator with the checkpoint state constructed
> > by
> > > > the
> > > > > converter function. I'm leaning towards the latter as long as there
> > is a
> > > > > convenient way to construct the state from a position (like
> > > > > enumStateForTimestamp). The converter would map one enum state to
> > another
> > > > > and can be made very simple by providing a few utility functions
> > instead
> > > > of
> > > > > mandating a new interface that enumerators need to implement to
> > become
> > > > > switchable.
> > > > >
> > > > > Again, a converter is only required when sources need to be
> switched
> > > > based
> > > > > on positions not known at graph construction time.
> > > > >
> > > > > I'm planning to add such deferred switching to the PoC for
> > illustration
> > > > > and will share the experiment when that's done.
> > > > >
> > > > > Cheers,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]
> >
> > > > wrote:
> > > > >
> > > > >> Hi Kezhu,
> > > > >>
> > > > >> Thanks for your detailed points for the Hybrid Source. I follow
> your
> > > > >> opinions and make a corresponding explanation as follows:
> > > > >>
> > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > switch/chain
> > > > >> multiple homogeneous sources?
> > > > >>
> > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > sources,
> > > > >> which
> > > > >> have the respective implementation for "SwitchableSource" and
> > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> > the
> > > > >> Sources consisted is homogeneous. From the user's perspective,
> User
> > only
> > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > smooth
> > > > >> migration operation to "HybridSource".
> > > > >>
> > > > >> 2."setStartState" is actually a reposition operation for next
> > source to
> > > > >> start in job runtime?
> > > > >>
> > > > >> IMO, "setStartState" is used to determine the initial position of
> > the
> > > > new
> > > > >> source for smooth migration, not reposition operation. More
> > importantly,
> > > > >> the
> > > > >> "State" mentioned here refers to the start and end positions of
> > reading
> > > > >> source.
> > > > >>
> > > > >> 3.This conversion should be implementation detail of next source,
> > not
> > > > >> converter function in my opinion?
> > > > >>
> > > > >> The state conversion is of course an implementation detail and
> > included
> > > > in
> > > > >> the switching mechanism, that should provide users with the
> > conversion
> > > > >> interface for conversion, which is defined in converter function.
> > What's
> > > > >> more, when users has already implemented "SwitchableSource" and
> > added to
> > > > >> the
> > > > >> Hybrid Source, the users don't need to implement the
> > "SwitchableSource"
> > > > >> for
> > > > >> the different conversion. From the user's perspective, users could
> > > > define
> > > > >> the different converter functions and create the
> "SwitchableSource"
> > for
> > > > >> the
> > > > >> addition of "HybridSource", no need to implement a Source for the
> > > > >> converter
> > > > >> function.
> > > > >>
> > > > >> 4.No configurable start-position. In this situation combination of
> > above
> > > > >> three joints is a nop, and
> > > > >> "HybridSource" is a chain of start-position pre-configured
> sources?
> > > > >>
> > > > >> Indeed there is no configurable start-position, and this
> > configuration
> > > > >> could
> > > > >> be involved in the feature. Users could use
> > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > configuration
> > > > >> parameters to configure start-position.
> > > > >>
> > > > >> 5.I am wonder whether end-position is a must and how it could be
> > useful
> > > > >> for
> > > > >> end users in a generic-enough source?
> > > > >>
> > > > >> "getEndState" interface is used for the smooth migration scenario,
> > which
> > > > >> could return null value if it is not needed. In the Hybrid Source
> > > > >> mechanism,
> > > > >> this interface is required for the switching between the sources
> > > > >> consisted,
> > > > >> otherwise there is no any way to get end-position of upstream
> > source. In
> > > > >> summary, Hybrid Source needs to be able to set the start position
> > and
> > > > get
> > > > >> the end position of each Source, otherwise there is no use to
> build
> > > > Hybrid
> > > > >> Source.
> > > > >>
> > > > >> 6.Is it possible for converter function to do blocking operations?
> > How
> > > > to
> > > > >> respond to checkpoint request when switching split enumerators
> cross
> > > > >> sources? Does end-position or start-position need to be stored in
> > > > >> checkpoint
> > > > >> state or not?
> > > > >>
> > > > >> The converter function only simply converts the state of upstream
> > source
> > > > >> to
> > > > >> the state of downstream source, not blocking operations. The way
> to
> > > > >> respond
> > > > >> the checkpoint request when switching split enumerators cross
> > sources is
> > > > >> send the corresponding "SourceEvent" to coordination. The
> > end-position
> > > > or
> > > > >> start-position don't need to be stored in checkpoint state, only
> > > > >> implements
> > > > >> the "getEndState" interface for end-position.
> > > > >>
> > > > >> Best,
> > > > >> Nicholas Jiang
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Sent from:
> > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >>
> > > > >
> > > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Becket,

I agree and am not planning to hard wire a specific combination of
sources (like S3 + Kafka). That also wouldn't help for the use case I
want to address, because there are customized connectors that we need
to be able to plug in.

Rather, the suggested simplification would be for the flexibility of
switching mechanism.

The prototype already supports fixed start positions and checkpoint
conversion for any combination of sources; no need to undo that.

But for testing/example purposes, we will need to settle on a specific
combination.

Thomas

On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <[hidden email]> wrote:

>
> Sorry for the late reply. Starting from a specific connector sounds
> reasonable to me.
>
> That said, I would suggest to keep the possibility of future generalization
> as much as possible. We have already seen some variation of source
> combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
> Binlog, etc. So it would be good if we can reuse some base abstraction in
> the future instead of having to write each combination from scratch.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote:
>
> > Thanks, Thomas!
> >
> > @Becket and @Nicholas - would you be ok with that approach?
> >
> >
> > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi Stephan,
> > >
> > > Thanks for the feedback!
> > >
> > > I agree with the approach of starting with a simple implementation
> > > that can address a well understood, significant portion of use cases.
> > >
> > > I'm planning to continue work on the prototype that I had shared.
> > > There is production level usage waiting for it fairly soon. I expect
> > > to open a PR in the coming weeks.
> > >
> > > Thomas
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote:
> > > >
> > > > Thanks all for this discussion. Looks like there are lots of ideas and
> > > > folks that are eager to do things, so let's see how we can get this
> > > moving.
> > > >
> > > > My take on this is the following:
> > > >
> > > > There will probably not be one Hybrid source, but possibly multiple
> > ones,
> > > > because of different strategies/requirements.
> > > >     - One may be very simple, with switching points known up-front.
> > Would
> > > > be good to have this in a very simple implementation.
> > > >     - There may be one where the switch is dynamic and the readers need
> > > to
> > > > report back where they left off.
> > > >     - There may be one that switches back and forth multiple times
> > > during a
> > > > job, for example Kakfa going to DFS whenever it falls behind retention,
> > > in
> > > > order to catch up again.
> > > >
> > > > This also seems hard to "design on paper"; I expect there are nuances
> > in
> > > a
> > > > production setup that affect some details of the design. So I'd feel
> > most
> > > > comfortable in adding a variant of the hybrid source to Flink that has
> > > been
> > > > used already in a real use case (not necessarily in production, but
> > maybe
> > > > in a testing/staging environment, so it seems to meet all
> > requirements).
> > > >
> > > >
> > > > What do you think about the following approach?
> > > >   - If there is a tested PoC, let's try to get it contributed to Flink
> > > > without trying to make it much more general.
> > > >   - When we see similar but a bit different requirements for another
> > > hybrid
> > > > source, then let's try to evolve the contributed one.
> > > >   - If we see new requirements that are so different that they don't
> > fit
> > > > well with the existing hybrid source, then let us look at building a
> > > second
> > > > hybrid source for those requirements.
> > > >
> > > > We need to make connector contributions in general more easy, and I
> > think
> > > > it is not a bad thing to end up with different approaches and see how
> > > these
> > > > play out against each other when being used by users. For example
> > > switching
> > > > with known boundaries, dynamic switching, back-and-forth-switching,
> > etc.
> > > > (I know some committers are planning to do some work on making
> > > > connector contributions easier, with standardized testing frameworks,
> > > > decoupled CI, etc.)
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > As mentioned in my previous email, I had been working on a prototype
> > > for
> > > > > the hybrid source.
> > > > >
> > > > > You can find it at https://github.com/tweise/flink/pull/1
> > > > >
> > > > > It contains:
> > > > > * Switching with configurable chain of sources
> > > > > * Fixed or dynamic start positions
> > > > > * Test with MockSource and FileSource
> > > > >
> > > > > The purpose of the above PR is to gather feedback and help drive
> > > consensus
> > > > > on the FLIP.
> > > > >
> > > > > * How to support a dynamic start position within the source chain?
> > > > >
> > > > > Relevant in those (few?) cases where start positions are not known
> > > upfront.
> > > > > You can find an example of what that might look like in the tests:
> > > > >
> > > > >
> > > > >
> > >
> > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > > >
> > > > >
> > >
> > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > > >
> > > > > When switching, the enumerator of the previous source needs to
> > > > > supply information about consumed splits that allows to set the start
> > > > > position for the next source. That could be something like the last
> > > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > > > doesn't track finished splits.)
> > > > >
> > > > > See previous discussion regarding start/end position. The prototype
> > > shows
> > > > > the use of checkpoint state with converter function.
> > > > >
> > > > > * Should readers be deployed dynamically?
> > > > >
> > > > > The prototype assumes a static source chain that is fixed at job
> > > submission
> > > > > time. Conceivably there could be use cases that require more
> > > flexibility.
> > > > > Such as switching one KafkaSource for another. A step in that
> > direction
> > > > > would be to deploy the actual readers dynamically, at the time of
> > > switching
> > > > > source.
> > > > >
> > > > > Looking forward to feedback and suggestions for next steps!
> > > > >
> > > > > Thomas
> > > > >
> > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi Nicholas,
> > > > > >
> > > > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > > > configurable sequence of sources with predefined bounds. I'm using
> > > the
> > > > > > unmodified MockSource for illustration. It does not require a
> > > > > "Switchable"
> > > > > > interface. I looked at the code you shared and the delegation and
> > > > > signaling
> > > > > > works quite similar. That's a good validation.
> > > > > >
> > > > > > Hi Kezhu,
> > > > > >
> > > > > > Thanks for bringing the more detailed discussion regarding the
> > > start/end
> > > > > > position. I think in most cases the start and end positions will be
> > > known
> > > > > > when the job is submitted. If we take a File -> Kafka source chain
> > as
> > > > > > example, there would most likely be a timestamp at which we want to
> > > > > > transition from files to reading from Kafka. So we would either set
> > > the
> > > > > > start position for Kafka based on that timestamp or provide the
> > > offsets
> > > > > > directly. (Note that I'm skipping a few related nuances here. In
> > > order to
> > > > > > achieve an exact switch without duplication or gap, we may also
> > need
> > > some
> > > > > > overlap and filtering, but that's a separate issue.)
> > > > > >
> > > > > > The point is that the start positions can be configured by the
> > user,
> > > > > there
> > > > > > is no need to transfer any information from one source to another
> > as
> > > part
> > > > > > of switching.
> > > > > >
> > > > > > It gets more complicated if we want to achieve a dynamic switch
> > > where the
> > > > > > transition timestamp isn't known when the job starts. For example,
> > > > > consider
> > > > > > a bootstrap scenario where the time taken to process historic data
> > > > > exceeds
> > > > > > the Kafka retention. Here, we would need to dynamically resolve the
> > > Kafka
> > > > > > start position based on where the file readers left off, when the
> > > > > switching
> > > > > > occurs. The file source enumerator would determine at runtime when
> > > it is
> > > > > > done handing splits to its readers, maybe when the max file
> > timestamp
> > > > > > reaches (processing time - X). This information needs to be
> > > transferred
> > > > > to
> > > > > > the Kafka source.
> > > > > >
> > > > > > The timestamp would need to be derived from the file enumerator
> > > state,
> > > > > > either by looking at the last splits or explicitly. The natural way
> > > to do
> > > > > > that is to introspect the enumerator state which gets checkpointed.
> > > Any
> > > > > > other form of "end position" via a special interface would need to
> > be
> > > > > > derived in the same manner.
> > > > > >
> > > > > > The converter that will be provided by the user would look at the
> > > file
> > > > > > enumerator state, derive the timestamp and then supply the "start
> > > > > position"
> > > > > > to the Kafka source. The Kafka source was created when the job
> > > started.
> > > > > It
> > > > > > needs to be augmented with the new start position. That can be
> > > achieved
> > > > > via
> > > > > > a special enumerator interface like
> > > > > SwitchableSplitEnumerator#setStartState
> > > > > > or by using restoreEnumerator with the checkpoint state constructed
> > > by
> > > > > the
> > > > > > converter function. I'm leaning towards the latter as long as there
> > > is a
> > > > > > convenient way to construct the state from a position (like
> > > > > > enumStateForTimestamp). The converter would map one enum state to
> > > another
> > > > > > and can be made very simple by providing a few utility functions
> > > instead
> > > > > of
> > > > > > mandating a new interface that enumerators need to implement to
> > > become
> > > > > > switchable.
> > > > > >
> > > > > > Again, a converter is only required when sources need to be
> > switched
> > > > > based
> > > > > > on positions not known at graph construction time.
> > > > > >
> > > > > > I'm planning to add such deferred switching to the PoC for
> > > illustration
> > > > > > and will share the experiment when that's done.
> > > > > >
> > > > > > Cheers,
> > > > > > Thomas
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hi Kezhu,
> > > > > >>
> > > > > >> Thanks for your detailed points for the Hybrid Source. I follow
> > your
> > > > > >> opinions and make a corresponding explanation as follows:
> > > > > >>
> > > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > > switch/chain
> > > > > >> multiple homogeneous sources?
> > > > > >>
> > > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > > sources,
> > > > > >> which
> > > > > >> have the respective implementation for "SwitchableSource" and
> > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> > > the
> > > > > >> Sources consisted is homogeneous. From the user's perspective,
> > User
> > > only
> > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > > smooth
> > > > > >> migration operation to "HybridSource".
> > > > > >>
> > > > > >> 2."setStartState" is actually a reposition operation for next
> > > source to
> > > > > >> start in job runtime?
> > > > > >>
> > > > > >> IMO, "setStartState" is used to determine the initial position of
> > > the
> > > > > new
> > > > > >> source for smooth migration, not reposition operation. More
> > > importantly,
> > > > > >> the
> > > > > >> "State" mentioned here refers to the start and end positions of
> > > reading
> > > > > >> source.
> > > > > >>
> > > > > >> 3.This conversion should be implementation detail of next source,
> > > not
> > > > > >> converter function in my opinion?
> > > > > >>
> > > > > >> The state conversion is of course an implementation detail and
> > > included
> > > > > in
> > > > > >> the switching mechanism, that should provide users with the
> > > conversion
> > > > > >> interface for conversion, which is defined in converter function.
> > > What's
> > > > > >> more, when users has already implemented "SwitchableSource" and
> > > added to
> > > > > >> the
> > > > > >> Hybrid Source, the users don't need to implement the
> > > "SwitchableSource"
> > > > > >> for
> > > > > >> the different conversion. From the user's perspective, users could
> > > > > define
> > > > > >> the different converter functions and create the
> > "SwitchableSource"
> > > for
> > > > > >> the
> > > > > >> addition of "HybridSource", no need to implement a Source for the
> > > > > >> converter
> > > > > >> function.
> > > > > >>
> > > > > >> 4.No configurable start-position. In this situation combination of
> > > above
> > > > > >> three joints is a nop, and
> > > > > >> "HybridSource" is a chain of start-position pre-configured
> > sources?
> > > > > >>
> > > > > >> Indeed there is no configurable start-position, and this
> > > configuration
> > > > > >> could
> > > > > >> be involved in the feature. Users could use
> > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > > configuration
> > > > > >> parameters to configure start-position.
> > > > > >>
> > > > > >> 5.I am wonder whether end-position is a must and how it could be
> > > useful
> > > > > >> for
> > > > > >> end users in a generic-enough source?
> > > > > >>
> > > > > >> "getEndState" interface is used for the smooth migration scenario,
> > > which
> > > > > >> could return null value if it is not needed. In the Hybrid Source
> > > > > >> mechanism,
> > > > > >> this interface is required for the switching between the sources
> > > > > >> consisted,
> > > > > >> otherwise there is no any way to get end-position of upstream
> > > source. In
> > > > > >> summary, Hybrid Source needs to be able to set the start position
> > > and
> > > > > get
> > > > > >> the end position of each Source, otherwise there is no use to
> > build
> > > > > Hybrid
> > > > > >> Source.
> > > > > >>
> > > > > >> 6.Is it possible for converter function to do blocking operations?
> > > How
> > > > > to
> > > > > >> respond to checkpoint request when switching split enumerators
> > cross
> > > > > >> sources? Does end-position or start-position need to be stored in
> > > > > >> checkpoint
> > > > > >> state or not?
> > > > > >>
> > > > > >> The converter function only simply converts the state of upstream
> > > source
> > > > > >> to
> > > > > >> the state of downstream source, not blocking operations. The way
> > to
> > > > > >> respond
> > > > > >> the checkpoint request when switching split enumerators cross
> > > sources is
> > > > > >> send the corresponding "SourceEvent" to coordination. The
> > > end-position
> > > > > or
> > > > > >> start-position don't need to be stored in checkpoint state, only
> > > > > >> implements
> > > > > >> the "getEndState" interface for end-position.
> > > > > >>
> > > > > >> Best,
> > > > > >> Nicholas Jiang
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Sent from:
> > > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > >>
> > > > > >
> > > > >
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Becket Qin
Thanks for the clarification, Thomas. Yes, that makes sense to me.

Cheers,

Jiangjie (Becket) Qin

On Mon, Apr 26, 2021 at 1:03 AM Thomas Weise <[hidden email]> wrote:

> Hi Becket,
>
> I agree and am not planning to hard wire a specific combination of
> sources (like S3 + Kafka). That also wouldn't help for the use case I
> want to address, because there are customized connectors that we need
> to be able to plug in.
>
> Rather, the suggested simplification would be for the flexibility of
> switching mechanism.
>
> The prototype already supports fixed start positions and checkpoint
> conversion for any combination of sources; no need to undo that.
>
> But for testing/example purposes, we will need to settle on a specific
> combination.
>
> Thomas
>
> On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <[hidden email]> wrote:
> >
> > Sorry for the late reply. Starting from a specific connector sounds
> > reasonable to me.
> >
> > That said, I would suggest to keep the possibility of future
> generalization
> > as much as possible. We have already seen some variation of source
> > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
> > Binlog, etc. So it would be good if we can reuse some base abstraction in
> > the future instead of having to write each combination from scratch.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote:
> >
> > > Thanks, Thomas!
> > >
> > > @Becket and @Nicholas - would you be ok with that approach?
> > >
> > >
> > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > Thanks for the feedback!
> > > >
> > > > I agree with the approach of starting with a simple implementation
> > > > that can address a well understood, significant portion of use cases.
> > > >
> > > > I'm planning to continue work on the prototype that I had shared.
> > > > There is production level usage waiting for it fairly soon. I expect
> > > > to open a PR in the coming weeks.
> > > >
> > > > Thomas
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]>
> wrote:
> > > > >
> > > > > Thanks all for this discussion. Looks like there are lots of ideas
> and
> > > > > folks that are eager to do things, so let's see how we can get this
> > > > moving.
> > > > >
> > > > > My take on this is the following:
> > > > >
> > > > > There will probably not be one Hybrid source, but possibly multiple
> > > ones,
> > > > > because of different strategies/requirements.
> > > > >     - One may be very simple, with switching points known up-front.
> > > Would
> > > > > be good to have this in a very simple implementation.
> > > > >     - There may be one where the switch is dynamic and the readers
> need
> > > > to
> > > > > report back where they left off.
> > > > >     - There may be one that switches back and forth multiple times
> > > > during a
> > > > > job, for example Kakfa going to DFS whenever it falls behind
> retention,
> > > > in
> > > > > order to catch up again.
> > > > >
> > > > > This also seems hard to "design on paper"; I expect there are
> nuances
> > > in
> > > > a
> > > > > production setup that affect some details of the design. So I'd
> feel
> > > most
> > > > > comfortable in adding a variant of the hybrid source to Flink that
> has
> > > > been
> > > > > used already in a real use case (not necessarily in production, but
> > > maybe
> > > > > in a testing/staging environment, so it seems to meet all
> > > requirements).
> > > > >
> > > > >
> > > > > What do you think about the following approach?
> > > > >   - If there is a tested PoC, let's try to get it contributed to
> Flink
> > > > > without trying to make it much more general.
> > > > >   - When we see similar but a bit different requirements for
> another
> > > > hybrid
> > > > > source, then let's try to evolve the contributed one.
> > > > >   - If we see new requirements that are so different that they
> don't
> > > fit
> > > > > well with the existing hybrid source, then let us look at building
> a
> > > > second
> > > > > hybrid source for those requirements.
> > > > >
> > > > > We need to make connector contributions in general more easy, and I
> > > think
> > > > > it is not a bad thing to end up with different approaches and see
> how
> > > > these
> > > > > play out against each other when being used by users. For example
> > > > switching
> > > > > with known boundaries, dynamic switching, back-and-forth-switching,
> > > etc.
> > > > > (I know some committers are planning to do some work on making
> > > > > connector contributions easier, with standardized testing
> frameworks,
> > > > > decoupled CI, etc.)
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]>
> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > As mentioned in my previous email, I had been working on a
> prototype
> > > > for
> > > > > > the hybrid source.
> > > > > >
> > > > > > You can find it at https://github.com/tweise/flink/pull/1
> > > > > >
> > > > > > It contains:
> > > > > > * Switching with configurable chain of sources
> > > > > > * Fixed or dynamic start positions
> > > > > > * Test with MockSource and FileSource
> > > > > >
> > > > > > The purpose of the above PR is to gather feedback and help drive
> > > > consensus
> > > > > > on the FLIP.
> > > > > >
> > > > > > * How to support a dynamic start position within the source
> chain?
> > > > > >
> > > > > > Relevant in those (few?) cases where start positions are not
> known
> > > > upfront.
> > > > > > You can find an example of what that might look like in the
> tests:
> > > > > >
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > > > >
> > > > > >
> > > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > > > >
> > > > > > When switching, the enumerator of the previous source needs to
> > > > > > supply information about consumed splits that allows to set the
> start
> > > > > > position for the next source. That could be something like the
> last
> > > > > > processed file, timestamp, etc. (Currently
> StaticFileSplitEnumerator
> > > > > > doesn't track finished splits.)
> > > > > >
> > > > > > See previous discussion regarding start/end position. The
> prototype
> > > > shows
> > > > > > the use of checkpoint state with converter function.
> > > > > >
> > > > > > * Should readers be deployed dynamically?
> > > > > >
> > > > > > The prototype assumes a static source chain that is fixed at job
> > > > submission
> > > > > > time. Conceivably there could be use cases that require more
> > > > flexibility.
> > > > > > Such as switching one KafkaSource for another. A step in that
> > > direction
> > > > > > would be to deploy the actual readers dynamically, at the time of
> > > > switching
> > > > > > source.
> > > > > >
> > > > > > Looking forward to feedback and suggestions for next steps!
> > > > > >
> > > > > > Thomas
> > > > > >
> > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > > Hi Nicholas,
> > > > > > >
> > > > > > > Thanks for the reply. I had implemented a small PoC. It
> switches a
> > > > > > > configurable sequence of sources with predefined bounds. I'm
> using
> > > > the
> > > > > > > unmodified MockSource for illustration. It does not require a
> > > > > > "Switchable"
> > > > > > > interface. I looked at the code you shared and the delegation
> and
> > > > > > signaling
> > > > > > > works quite similar. That's a good validation.
> > > > > > >
> > > > > > > Hi Kezhu,
> > > > > > >
> > > > > > > Thanks for bringing the more detailed discussion regarding the
> > > > start/end
> > > > > > > position. I think in most cases the start and end positions
> will be
> > > > known
> > > > > > > when the job is submitted. If we take a File -> Kafka source
> chain
> > > as
> > > > > > > example, there would most likely be a timestamp at which we
> want to
> > > > > > > transition from files to reading from Kafka. So we would
> either set
> > > > the
> > > > > > > start position for Kafka based on that timestamp or provide the
> > > > offsets
> > > > > > > directly. (Note that I'm skipping a few related nuances here.
> In
> > > > order to
> > > > > > > achieve an exact switch without duplication or gap, we may also
> > > need
> > > > some
> > > > > > > overlap and filtering, but that's a separate issue.)
> > > > > > >
> > > > > > > The point is that the start positions can be configured by the
> > > user,
> > > > > > there
> > > > > > > is no need to transfer any information from one source to
> another
> > > as
> > > > part
> > > > > > > of switching.
> > > > > > >
> > > > > > > It gets more complicated if we want to achieve a dynamic switch
> > > > where the
> > > > > > > transition timestamp isn't known when the job starts. For
> example,
> > > > > > consider
> > > > > > > a bootstrap scenario where the time taken to process historic
> data
> > > > > > exceeds
> > > > > > > the Kafka retention. Here, we would need to dynamically
> resolve the
> > > > Kafka
> > > > > > > start position based on where the file readers left off, when
> the
> > > > > > switching
> > > > > > > occurs. The file source enumerator would determine at runtime
> when
> > > > it is
> > > > > > > done handing splits to its readers, maybe when the max file
> > > timestamp
> > > > > > > reaches (processing time - X). This information needs to be
> > > > transferred
> > > > > > to
> > > > > > > the Kafka source.
> > > > > > >
> > > > > > > The timestamp would need to be derived from the file enumerator
> > > > state,
> > > > > > > either by looking at the last splits or explicitly. The
> natural way
> > > > to do
> > > > > > > that is to introspect the enumerator state which gets
> checkpointed.
> > > > Any
> > > > > > > other form of "end position" via a special interface would
> need to
> > > be
> > > > > > > derived in the same manner.
> > > > > > >
> > > > > > > The converter that will be provided by the user would look at
> the
> > > > file
> > > > > > > enumerator state, derive the timestamp and then supply the
> "start
> > > > > > position"
> > > > > > > to the Kafka source. The Kafka source was created when the job
> > > > started.
> > > > > > It
> > > > > > > needs to be augmented with the new start position. That can be
> > > > achieved
> > > > > > via
> > > > > > > a special enumerator interface like
> > > > > > SwitchableSplitEnumerator#setStartState
> > > > > > > or by using restoreEnumerator with the checkpoint state
> constructed
> > > > by
> > > > > > the
> > > > > > > converter function. I'm leaning towards the latter as long as
> there
> > > > is a
> > > > > > > convenient way to construct the state from a position (like
> > > > > > > enumStateForTimestamp). The converter would map one enum state
> to
> > > > another
> > > > > > > and can be made very simple by providing a few utility
> functions
> > > > instead
> > > > > > of
> > > > > > > mandating a new interface that enumerators need to implement to
> > > > become
> > > > > > > switchable.
> > > > > > >
> > > > > > > Again, a converter is only required when sources need to be
> > > switched
> > > > > > based
> > > > > > > on positions not known at graph construction time.
> > > > > > >
> > > > > > > I'm planning to add such deferred switching to the PoC for
> > > > illustration
> > > > > > > and will share the experiment when that's done.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Thomas
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <
> [hidden email]
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Kezhu,
> > > > > > >>
> > > > > > >> Thanks for your detailed points for the Hybrid Source. I
> follow
> > > your
> > > > > > >> opinions and make a corresponding explanation as follows:
> > > > > > >>
> > > > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > > > switch/chain
> > > > > > >> multiple homogeneous sources?
> > > > > > >>
> > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > > > sources,
> > > > > > >> which
> > > > > > >> have the respective implementation for "SwitchableSource" and
> > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit
> whether
> > > > the
> > > > > > >> Sources consisted is homogeneous. From the user's perspective,
> > > User
> > > > only
> > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > > > smooth
> > > > > > >> migration operation to "HybridSource".
> > > > > > >>
> > > > > > >> 2."setStartState" is actually a reposition operation for next
> > > > source to
> > > > > > >> start in job runtime?
> > > > > > >>
> > > > > > >> IMO, "setStartState" is used to determine the initial
> position of
> > > > the
> > > > > > new
> > > > > > >> source for smooth migration, not reposition operation. More
> > > > importantly,
> > > > > > >> the
> > > > > > >> "State" mentioned here refers to the start and end positions
> of
> > > > reading
> > > > > > >> source.
> > > > > > >>
> > > > > > >> 3.This conversion should be implementation detail of next
> source,
> > > > not
> > > > > > >> converter function in my opinion?
> > > > > > >>
> > > > > > >> The state conversion is of course an implementation detail and
> > > > included
> > > > > > in
> > > > > > >> the switching mechanism, that should provide users with the
> > > > conversion
> > > > > > >> interface for conversion, which is defined in converter
> function.
> > > > What's
> > > > > > >> more, when users has already implemented "SwitchableSource"
> and
> > > > added to
> > > > > > >> the
> > > > > > >> Hybrid Source, the users don't need to implement the
> > > > "SwitchableSource"
> > > > > > >> for
> > > > > > >> the different conversion. From the user's perspective, users
> could
> > > > > > define
> > > > > > >> the different converter functions and create the
> > > "SwitchableSource"
> > > > for
> > > > > > >> the
> > > > > > >> addition of "HybridSource", no need to implement a Source for
> the
> > > > > > >> converter
> > > > > > >> function.
> > > > > > >>
> > > > > > >> 4.No configurable start-position. In this situation
> combination of
> > > > above
> > > > > > >> three joints is a nop, and
> > > > > > >> "HybridSource" is a chain of start-position pre-configured
> > > sources?
> > > > > > >>
> > > > > > >> Indeed there is no configurable start-position, and this
> > > > configuration
> > > > > > >> could
> > > > > > >> be involved in the feature. Users could use
> > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > > > configuration
> > > > > > >> parameters to configure start-position.
> > > > > > >>
> > > > > > >> 5.I am wonder whether end-position is a must and how it could
> be
> > > > useful
> > > > > > >> for
> > > > > > >> end users in a generic-enough source?
> > > > > > >>
> > > > > > >> "getEndState" interface is used for the smooth migration
> scenario,
> > > > which
> > > > > > >> could return null value if it is not needed. In the Hybrid
> Source
> > > > > > >> mechanism,
> > > > > > >> this interface is required for the switching between the
> sources
> > > > > > >> consisted,
> > > > > > >> otherwise there is no any way to get end-position of upstream
> > > > source. In
> > > > > > >> summary, Hybrid Source needs to be able to set the start
> position
> > > > and
> > > > > > get
> > > > > > >> the end position of each Source, otherwise there is no use to
> > > build
> > > > > > Hybrid
> > > > > > >> Source.
> > > > > > >>
> > > > > > >> 6.Is it possible for converter function to do blocking
> operations?
> > > > How
> > > > > > to
> > > > > > >> respond to checkpoint request when switching split enumerators
> > > cross
> > > > > > >> sources? Does end-position or start-position need to be
> stored in
> > > > > > >> checkpoint
> > > > > > >> state or not?
> > > > > > >>
> > > > > > >> The converter function only simply converts the state of
> upstream
> > > > source
> > > > > > >> to
> > > > > > >> the state of downstream source, not blocking operations. The
> way
> > > to
> > > > > > >> respond
> > > > > > >> the checkpoint request when switching split enumerators cross
> > > > sources is
> > > > > > >> send the corresponding "SourceEvent" to coordination. The
> > > > end-position
> > > > > > or
> > > > > > >> start-position don't need to be stored in checkpoint state,
> only
> > > > > > >> implements
> > > > > > >> the "getEndState" interface for end-position.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Nicholas Jiang
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> Sent from:
> > > > > > >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Nicholas Jiang
In reply to this post by Stephan Ewen
Hi Thomas,

   Sorry for later reply for your POC. I have reviewed the based abstract
implementation of your pull request:
https://github.com/apache/flink/pull/15924. IMO, for the switching
mechanism, this level of abstraction is not concise enough, which doesn't
make connector contribution easier. In theory, it is necessary to introduce
a set of interfaces to support the switching mechanism. The SwitchableSource
and SwitchableSplitEnumerator interfaces are needed for connector
expansibility.
   In other words, the whole switching process of above mentioned PR is
different from that mentioned in FLIP-150. In the above implementation, the
source reading switching is executed after receving the SwitchSourceEvent,
which could be before the sending SourceReaderFinishEvent. This timeline of
source reading switching could be discussed here.
   @Stephan @Becket, if you are available, please help to review the
abstract implementation, and compare with the interfaces mentioned in
FLIP-150.

Thanks,
Nicholas Jiang



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Nicholas,

Thanks for taking a look at the PR!

1. Regarding switching mechanism:

There has been previous discussion in this thread regarding the pros
and cons of how the switching can be exposed to the user.

With fixed start positions, no special switching interface to transfer
information between enumerators is required. Sources are configured as
they would be when used standalone and just plugged into HybridSource.
I expect that to be a common use case. You can find an example for
this in the ITCase:

https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101

For dynamic start position, the checkpoint state is used to transfer
information from old to new enumerator. An example for that can be
found here:

https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136

That may look verbose, but the code to convert from one state to
another can be factored out into a utility and the function becomes a
one-liner.

For common sources like files and Kafka we can potentially (later)
implement the conversion logic as part of the respective connector's
checkpoint and split classes.

I hope that with the PR up for review, we can soon reach a conclusion
on how we want to expose this to the user.

Following is an example for Files -> Files -> Kafka that I'm using for
e2e testing. It exercises both ways of setting the start position.

https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a


2. Regarding the events used to implement the actual switch between
enumerator and readers: I updated the PR with javadoc to clarify the
intent. Please let me know if that helps or let's continue to discuss
those details on the PR?


Thanks,
Thomas


On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> wrote:

>
> Hi Thomas,
>
>    Sorry for later reply for your POC. I have reviewed the based abstract
> implementation of your pull request:
> https://github.com/apache/flink/pull/15924. IMO, for the switching
> mechanism, this level of abstraction is not concise enough, which doesn't
> make connector contribution easier. In theory, it is necessary to introduce
> a set of interfaces to support the switching mechanism. The SwitchableSource
> and SwitchableSplitEnumerator interfaces are needed for connector
> expansibility.
>    In other words, the whole switching process of above mentioned PR is
> different from that mentioned in FLIP-150. In the above implementation, the
> source reading switching is executed after receving the SwitchSourceEvent,
> which could be before the sending SourceReaderFinishEvent. This timeline of
> source reading switching could be discussed here.
>    @Stephan @Becket, if you are available, please help to review the
> abstract implementation, and compare with the interfaces mentioned in
> FLIP-150.
>
> Thanks,
> Nicholas Jiang
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Steven Wu
discussed the PR with Thosmas offline. Thomas, please correct me if I
missed anything.

Right now, the PR differs from the FLIP-150 doc regarding the converter.
* Current PR uses the enumerator checkpoint state type as the input for the
converter
* FLIP-150 defines a new EndStateT interface.
It seems that the FLIP-150 approach of EndStateT is more flexible, as
transition EndStateT doesn't have to be included in the upstream source
checkpoint state.

Let's look at two use cases:
1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
then Kafka source starts with initial position of 5 pm. In this case, there
is no need for converter or EndStateT since the starting time for Kafka
source is known and fixed.
2) dynamic cutover time at 1 hour before now. This is useful when the
bootstrap of historic data takes a long time (like days or weeks) and we
don't know the exact time of cutover when a job is launched. Instead, we
are instructing the file source to stop when it gets close to live data. In
this case, hybrid source construction will specify a relative time (now - 1
hour), the EndStateT (of file source) will be resolved to an absolute time
for cutover. We probably don't need to include EndStateT (end timestamp) as
the file source checkpoint state. Hence, the separate EndStateT is probably
more desirable.

We also discussed the converter for the Kafka source. Kafka source supports
different OffsetsInitializer impls (including TimestampOffsetsInitializer).
To support the dynamic cutover time (use case #2 above), we can plug in a
SupplierTimestampOffsetInitializer, where the starting timestamp is not set
during source/job construction. Rather it is a supplier model where the
starting timestamp value is set to the resolved absolute timestamp during
switch.

Thanks,
Steven



On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:

> Hi Nicholas,
>
> Thanks for taking a look at the PR!
>
> 1. Regarding switching mechanism:
>
> There has been previous discussion in this thread regarding the pros
> and cons of how the switching can be exposed to the user.
>
> With fixed start positions, no special switching interface to transfer
> information between enumerators is required. Sources are configured as
> they would be when used standalone and just plugged into HybridSource.
> I expect that to be a common use case. You can find an example for
> this in the ITCase:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>
> For dynamic start position, the checkpoint state is used to transfer
> information from old to new enumerator. An example for that can be
> found here:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>
> That may look verbose, but the code to convert from one state to
> another can be factored out into a utility and the function becomes a
> one-liner.
>
> For common sources like files and Kafka we can potentially (later)
> implement the conversion logic as part of the respective connector's
> checkpoint and split classes.
>
> I hope that with the PR up for review, we can soon reach a conclusion
> on how we want to expose this to the user.
>
> Following is an example for Files -> Files -> Kafka that I'm using for
> e2e testing. It exercises both ways of setting the start position.
>
> https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>
>
> 2. Regarding the events used to implement the actual switch between
> enumerator and readers: I updated the PR with javadoc to clarify the
> intent. Please let me know if that helps or let's continue to discuss
> those details on the PR?
>
>
> Thanks,
> Thomas
>
>
> On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]>
> wrote:
> >
> > Hi Thomas,
> >
> >    Sorry for later reply for your POC. I have reviewed the based abstract
> > implementation of your pull request:
> > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > mechanism, this level of abstraction is not concise enough, which doesn't
> > make connector contribution easier. In theory, it is necessary to
> introduce
> > a set of interfaces to support the switching mechanism. The
> SwitchableSource
> > and SwitchableSplitEnumerator interfaces are needed for connector
> > expansibility.
> >    In other words, the whole switching process of above mentioned PR is
> > different from that mentioned in FLIP-150. In the above implementation,
> the
> > source reading switching is executed after receving the
> SwitchSourceEvent,
> > which could be before the sending SourceReaderFinishEvent. This timeline
> of
> > source reading switching could be discussed here.
> >    @Stephan @Becket, if you are available, please help to review the
> > abstract implementation, and compare with the interfaces mentioned in
> > FLIP-150.
> >
> > Thanks,
> > Nicholas Jiang
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Thomas Weise
Hi Steven,

Thank you for the thorough review of the PR and for bringing this back
to the mailing list.

All,

I updated the FLIP-150 page to highlight aspects in which the PR
deviates from the original proposal [1]. The goal would be to update
the FLIP soon and bring it to a vote, as previously suggested offline
by Nicholas.

A few minor issues in the PR are outstanding and I'm working on test
coverage for the recovery behavior, which should be completed soon.

The dynamic position transfer needs to be concluded before we can move
forward however.

There have been various ideas, including the special
"SwitchableEnumerator" interface, using enumerator checkpoint state or
an enumerator interface extension to extract the end state.

One goal in the FLIP is to "Reuse the existing Source connectors built
with FLIP-27 without any change." and I think it is important to honor
that goal given that fixed start positions do not require interface
changes.

Based on the feedback the following might be a good solution for
runtime position transfer:

* User supplies the optional converter function (not applicable for
fixed positions).
* Instead of relying on the enumerator checkpoint state [2], the
converter function will be supplied with the current and next
enumerator (source.createEnumerator).
* Converter function relies on the specific enumerator capabilities to
set the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
* HybridSourceSplitEnumerator starts new underlying enumerator

With this approach, there is no need to augment FLIP-27 interfaces and
custom source capabilities are easier to integrate. Removing the
mandate to rely on enumerator checkpoint state also avoids potential
upgrade/compatibility issues.

Thoughts?

Thanks,
Thomas

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
[2] https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281


On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote:

>
> discussed the PR with Thosmas offline. Thomas, please correct me if I
> missed anything.
>
> Right now, the PR differs from the FLIP-150 doc regarding the converter.
> * Current PR uses the enumerator checkpoint state type as the input for the
> converter
> * FLIP-150 defines a new EndStateT interface.
> It seems that the FLIP-150 approach of EndStateT is more flexible, as
> transition EndStateT doesn't have to be included in the upstream source
> checkpoint state.
>
> Let's look at two use cases:
> 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
> then Kafka source starts with initial position of 5 pm. In this case, there
> is no need for converter or EndStateT since the starting time for Kafka
> source is known and fixed.
> 2) dynamic cutover time at 1 hour before now. This is useful when the
> bootstrap of historic data takes a long time (like days or weeks) and we
> don't know the exact time of cutover when a job is launched. Instead, we
> are instructing the file source to stop when it gets close to live data. In
> this case, hybrid source construction will specify a relative time (now - 1
> hour), the EndStateT (of file source) will be resolved to an absolute time
> for cutover. We probably don't need to include EndStateT (end timestamp) as
> the file source checkpoint state. Hence, the separate EndStateT is probably
> more desirable.
>
> We also discussed the converter for the Kafka source. Kafka source supports
> different OffsetsInitializer impls (including TimestampOffsetsInitializer).
> To support the dynamic cutover time (use case #2 above), we can plug in a
> SupplierTimestampOffsetInitializer, where the starting timestamp is not set
> during source/job construction. Rather it is a supplier model where the
> starting timestamp value is set to the resolved absolute timestamp during
> switch.
>
> Thanks,
> Steven
>
>
>
> On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:
>
> > Hi Nicholas,
> >
> > Thanks for taking a look at the PR!
> >
> > 1. Regarding switching mechanism:
> >
> > There has been previous discussion in this thread regarding the pros
> > and cons of how the switching can be exposed to the user.
> >
> > With fixed start positions, no special switching interface to transfer
> > information between enumerators is required. Sources are configured as
> > they would be when used standalone and just plugged into HybridSource.
> > I expect that to be a common use case. You can find an example for
> > this in the ITCase:
> >
> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> >
> > For dynamic start position, the checkpoint state is used to transfer
> > information from old to new enumerator. An example for that can be
> > found here:
> >
> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> >
> > That may look verbose, but the code to convert from one state to
> > another can be factored out into a utility and the function becomes a
> > one-liner.
> >
> > For common sources like files and Kafka we can potentially (later)
> > implement the conversion logic as part of the respective connector's
> > checkpoint and split classes.
> >
> > I hope that with the PR up for review, we can soon reach a conclusion
> > on how we want to expose this to the user.
> >
> > Following is an example for Files -> Files -> Kafka that I'm using for
> > e2e testing. It exercises both ways of setting the start position.
> >
> > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> >
> >
> > 2. Regarding the events used to implement the actual switch between
> > enumerator and readers: I updated the PR with javadoc to clarify the
> > intent. Please let me know if that helps or let's continue to discuss
> > those details on the PR?
> >
> >
> > Thanks,
> > Thomas
> >
> >
> > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]>
> > wrote:
> > >
> > > Hi Thomas,
> > >
> > >    Sorry for later reply for your POC. I have reviewed the based abstract
> > > implementation of your pull request:
> > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > mechanism, this level of abstraction is not concise enough, which doesn't
> > > make connector contribution easier. In theory, it is necessary to
> > introduce
> > > a set of interfaces to support the switching mechanism. The
> > SwitchableSource
> > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > expansibility.
> > >    In other words, the whole switching process of above mentioned PR is
> > > different from that mentioned in FLIP-150. In the above implementation,
> > the
> > > source reading switching is executed after receving the
> > SwitchSourceEvent,
> > > which could be before the sending SourceReaderFinishEvent. This timeline
> > of
> > > source reading switching could be discussed here.
> > >    @Stephan @Becket, if you are available, please help to review the
> > > abstract implementation, and compare with the interfaces mentioned in
> > > FLIP-150.
> > >
> > > Thanks,
> > > Nicholas Jiang
> > >
> > >
> > >
> > > --
> > > Sent from:
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Steven Wu
> Converter function relies on the specific enumerator capabilities to set
the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)

I guess the premise is that a converter is for a specific tuple of
(upstream source, downstream source) . We don't have to define generic
EndtStateT and SwitchableEnumerator interfaces. That should work.

The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
probably promoting uniformity across sources that support hybrid/switchable
source.

On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:

> Hi Steven,
>
> Thank you for the thorough review of the PR and for bringing this back
> to the mailing list.
>
> All,
>
> I updated the FLIP-150 page to highlight aspects in which the PR
> deviates from the original proposal [1]. The goal would be to update
> the FLIP soon and bring it to a vote, as previously suggested offline
> by Nicholas.
>
> A few minor issues in the PR are outstanding and I'm working on test
> coverage for the recovery behavior, which should be completed soon.
>
> The dynamic position transfer needs to be concluded before we can move
> forward however.
>
> There have been various ideas, including the special
> "SwitchableEnumerator" interface, using enumerator checkpoint state or
> an enumerator interface extension to extract the end state.
>
> One goal in the FLIP is to "Reuse the existing Source connectors built
> with FLIP-27 without any change." and I think it is important to honor
> that goal given that fixed start positions do not require interface
> changes.
>
> Based on the feedback the following might be a good solution for
> runtime position transfer:
>
> * User supplies the optional converter function (not applicable for
> fixed positions).
> * Instead of relying on the enumerator checkpoint state [2], the
> converter function will be supplied with the current and next
> enumerator (source.createEnumerator).
> * Converter function relies on the specific enumerator capabilities to
> set the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> * HybridSourceSplitEnumerator starts new underlying enumerator
>
> With this approach, there is no need to augment FLIP-27 interfaces and
> custom source capabilities are easier to integrate. Removing the
> mandate to rely on enumerator checkpoint state also avoids potential
> upgrade/compatibility issues.
>
> Thoughts?
>
> Thanks,
> Thomas
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> [2]
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>
>
> On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote:
> >
> > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > missed anything.
> >
> > Right now, the PR differs from the FLIP-150 doc regarding the converter.
> > * Current PR uses the enumerator checkpoint state type as the input for
> the
> > converter
> > * FLIP-150 defines a new EndStateT interface.
> > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > transition EndStateT doesn't have to be included in the upstream source
> > checkpoint state.
> >
> > Let's look at two use cases:
> > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> pm,
> > then Kafka source starts with initial position of 5 pm. In this case,
> there
> > is no need for converter or EndStateT since the starting time for Kafka
> > source is known and fixed.
> > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > bootstrap of historic data takes a long time (like days or weeks) and we
> > don't know the exact time of cutover when a job is launched. Instead, we
> > are instructing the file source to stop when it gets close to live data.
> In
> > this case, hybrid source construction will specify a relative time (now
> - 1
> > hour), the EndStateT (of file source) will be resolved to an absolute
> time
> > for cutover. We probably don't need to include EndStateT (end timestamp)
> as
> > the file source checkpoint state. Hence, the separate EndStateT is
> probably
> > more desirable.
> >
> > We also discussed the converter for the Kafka source. Kafka source
> supports
> > different OffsetsInitializer impls (including
> TimestampOffsetsInitializer).
> > To support the dynamic cutover time (use case #2 above), we can plug in a
> > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> set
> > during source/job construction. Rather it is a supplier model where the
> > starting timestamp value is set to the resolved absolute timestamp during
> > switch.
> >
> > Thanks,
> > Steven
> >
> >
> >
> > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for taking a look at the PR!
> > >
> > > 1. Regarding switching mechanism:
> > >
> > > There has been previous discussion in this thread regarding the pros
> > > and cons of how the switching can be exposed to the user.
> > >
> > > With fixed start positions, no special switching interface to transfer
> > > information between enumerators is required. Sources are configured as
> > > they would be when used standalone and just plugged into HybridSource.
> > > I expect that to be a common use case. You can find an example for
> > > this in the ITCase:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > >
> > > For dynamic start position, the checkpoint state is used to transfer
> > > information from old to new enumerator. An example for that can be
> > > found here:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > >
> > > That may look verbose, but the code to convert from one state to
> > > another can be factored out into a utility and the function becomes a
> > > one-liner.
> > >
> > > For common sources like files and Kafka we can potentially (later)
> > > implement the conversion logic as part of the respective connector's
> > > checkpoint and split classes.
> > >
> > > I hope that with the PR up for review, we can soon reach a conclusion
> > > on how we want to expose this to the user.
> > >
> > > Following is an example for Files -> Files -> Kafka that I'm using for
> > > e2e testing. It exercises both ways of setting the start position.
> > >
> > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > >
> > >
> > > 2. Regarding the events used to implement the actual switch between
> > > enumerator and readers: I updated the PR with javadoc to clarify the
> > > intent. Please let me know if that helps or let's continue to discuss
> > > those details on the PR?
> > >
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]>
> > > wrote:
> > > >
> > > > Hi Thomas,
> > > >
> > > >    Sorry for later reply for your POC. I have reviewed the based
> abstract
> > > > implementation of your pull request:
> > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > > mechanism, this level of abstraction is not concise enough, which
> doesn't
> > > > make connector contribution easier. In theory, it is necessary to
> > > introduce
> > > > a set of interfaces to support the switching mechanism. The
> > > SwitchableSource
> > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > expansibility.
> > > >    In other words, the whole switching process of above mentioned PR
> is
> > > > different from that mentioned in FLIP-150. In the above
> implementation,
> > > the
> > > > source reading switching is executed after receving the
> > > SwitchSourceEvent,
> > > > which could be before the sending SourceReaderFinishEvent. This
> timeline
> > > of
> > > > source reading switching could be discussed here.
> > > >    @Stephan @Becket, if you are available, please help to review the
> > > > abstract implementation, and compare with the interfaces mentioned in
> > > > FLIP-150.
> > > >
> > > > Thanks,
> > > > Nicholas Jiang
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from:
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

Arvid Heise-4
Sorry for joining the party so late, but it's such an interesting FLIP with
a huge impact that I wanted to add my 2 cents. [1]
I'm mirroring some basic question from the PR review to this thread because
it's about the name:

We could rename the thing to ConcatenatedSource(s), SourceSequence, or
similar.
Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
does not carry the concatentation concept as well (hybrid sounds to me more
like the source would constantly switch back and forth).

Could we take a few minutes to think if this is the most intuitive name for
new users? I'm especially hoping that natives might give some ideas (or
declare that Hybrid is perfect).

[1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664

On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote:

> > Converter function relies on the specific enumerator capabilities to set
> the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>
> I guess the premise is that a converter is for a specific tuple of
> (upstream source, downstream source) . We don't have to define generic
> EndtStateT and SwitchableEnumerator interfaces. That should work.
>
> The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> probably promoting uniformity across sources that support hybrid/switchable
> source.
>
> On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote:
>
> > Hi Steven,
> >
> > Thank you for the thorough review of the PR and for bringing this back
> > to the mailing list.
> >
> > All,
> >
> > I updated the FLIP-150 page to highlight aspects in which the PR
> > deviates from the original proposal [1]. The goal would be to update
> > the FLIP soon and bring it to a vote, as previously suggested offline
> > by Nicholas.
> >
> > A few minor issues in the PR are outstanding and I'm working on test
> > coverage for the recovery behavior, which should be completed soon.
> >
> > The dynamic position transfer needs to be concluded before we can move
> > forward however.
> >
> > There have been various ideas, including the special
> > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > an enumerator interface extension to extract the end state.
> >
> > One goal in the FLIP is to "Reuse the existing Source connectors built
> > with FLIP-27 without any change." and I think it is important to honor
> > that goal given that fixed start positions do not require interface
> > changes.
> >
> > Based on the feedback the following might be a good solution for
> > runtime position transfer:
> >
> > * User supplies the optional converter function (not applicable for
> > fixed positions).
> > * Instead of relying on the enumerator checkpoint state [2], the
> > converter function will be supplied with the current and next
> > enumerator (source.createEnumerator).
> > * Converter function relies on the specific enumerator capabilities to
> > set the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > * HybridSourceSplitEnumerator starts new underlying enumerator
> >
> > With this approach, there is no need to augment FLIP-27 interfaces and
> > custom source capabilities are easier to integrate. Removing the
> > mandate to rely on enumerator checkpoint state also avoids potential
> > upgrade/compatibility issues.
> >
> > Thoughts?
> >
> > Thanks,
> > Thomas
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > [2]
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> >
> >
> > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote:
> > >
> > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > missed anything.
> > >
> > > Right now, the PR differs from the FLIP-150 doc regarding the
> converter.
> > > * Current PR uses the enumerator checkpoint state type as the input for
> > the
> > > converter
> > > * FLIP-150 defines a new EndStateT interface.
> > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > transition EndStateT doesn't have to be included in the upstream source
> > > checkpoint state.
> > >
> > > Let's look at two use cases:
> > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> > pm,
> > > then Kafka source starts with initial position of 5 pm. In this case,
> > there
> > > is no need for converter or EndStateT since the starting time for Kafka
> > > source is known and fixed.
> > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > bootstrap of historic data takes a long time (like days or weeks) and
> we
> > > don't know the exact time of cutover when a job is launched. Instead,
> we
> > > are instructing the file source to stop when it gets close to live
> data.
> > In
> > > this case, hybrid source construction will specify a relative time (now
> > - 1
> > > hour), the EndStateT (of file source) will be resolved to an absolute
> > time
> > > for cutover. We probably don't need to include EndStateT (end
> timestamp)
> > as
> > > the file source checkpoint state. Hence, the separate EndStateT is
> > probably
> > > more desirable.
> > >
> > > We also discussed the converter for the Kafka source. Kafka source
> > supports
> > > different OffsetsInitializer impls (including
> > TimestampOffsetsInitializer).
> > > To support the dynamic cutover time (use case #2 above), we can plug
> in a
> > > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> > set
> > > during source/job construction. Rather it is a supplier model where the
> > > starting timestamp value is set to the resolved absolute timestamp
> during
> > > switch.
> > >
> > > Thanks,
> > > Steven
> > >
> > >
> > >
> > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi Nicholas,
> > > >
> > > > Thanks for taking a look at the PR!
> > > >
> > > > 1. Regarding switching mechanism:
> > > >
> > > > There has been previous discussion in this thread regarding the pros
> > > > and cons of how the switching can be exposed to the user.
> > > >
> > > > With fixed start positions, no special switching interface to
> transfer
> > > > information between enumerators is required. Sources are configured
> as
> > > > they would be when used standalone and just plugged into
> HybridSource.
> > > > I expect that to be a common use case. You can find an example for
> > > > this in the ITCase:
> > > >
> > > >
> > > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > >
> > > > For dynamic start position, the checkpoint state is used to transfer
> > > > information from old to new enumerator. An example for that can be
> > > > found here:
> > > >
> > > >
> > > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > >
> > > > That may look verbose, but the code to convert from one state to
> > > > another can be factored out into a utility and the function becomes a
> > > > one-liner.
> > > >
> > > > For common sources like files and Kafka we can potentially (later)
> > > > implement the conversion logic as part of the respective connector's
> > > > checkpoint and split classes.
> > > >
> > > > I hope that with the PR up for review, we can soon reach a conclusion
> > > > on how we want to expose this to the user.
> > > >
> > > > Following is an example for Files -> Files -> Kafka that I'm using
> for
> > > > e2e testing. It exercises both ways of setting the start position.
> > > >
> > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > >
> > > >
> > > > 2. Regarding the events used to implement the actual switch between
> > > > enumerator and readers: I updated the PR with javadoc to clarify the
> > > > intent. Please let me know if that helps or let's continue to discuss
> > > > those details on the PR?
> > > >
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]>
> > > > wrote:
> > > > >
> > > > > Hi Thomas,
> > > > >
> > > > >    Sorry for later reply for your POC. I have reviewed the based
> > abstract
> > > > > implementation of your pull request:
> > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > > > mechanism, this level of abstraction is not concise enough, which
> > doesn't
> > > > > make connector contribution easier. In theory, it is necessary to
> > > > introduce
> > > > > a set of interfaces to support the switching mechanism. The
> > > > SwitchableSource
> > > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > > expansibility.
> > > > >    In other words, the whole switching process of above mentioned
> PR
> > is
> > > > > different from that mentioned in FLIP-150. In the above
> > implementation,
> > > > the
> > > > > source reading switching is executed after receving the
> > > > SwitchSourceEvent,
> > > > > which could be before the sending SourceReaderFinishEvent. This
> > timeline
> > > > of
> > > > > source reading switching could be discussed here.
> > > > >    @Stephan @Becket, if you are available, please help to review
> the
> > > > > abstract implementation, and compare with the interfaces mentioned
> in
> > > > > FLIP-150.
> > > > >
> > > > > Thanks,
> > > > > Nicholas Jiang
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sent from:
> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >
> >
>
12