http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-150-Introduce-Hybrid-Source-tp46271p50127.html
Thanks all for this discussion. Looks like there are lots of ideas and
- One may be very simple, with switching points known up-front. Would
production setup that affect some details of the design. So I'd feel most
play out against each other when being used by users. For example switching
> Hi,
>
> As mentioned in my previous email, I had been working on a prototype for
> the hybrid source.
>
> You can find it at
https://github.com/tweise/flink/pull/1>
> It contains:
> * Switching with configurable chain of sources
> * Fixed or dynamic start positions
> * Test with MockSource and FileSource
>
> The purpose of the above PR is to gather feedback and help drive consensus
> on the FLIP.
>
> * How to support a dynamic start position within the source chain?
>
> Relevant in those (few?) cases where start positions are not known upfront.
> You can find an example of what that might look like in the tests:
>
>
>
https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62>
>
https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132>
> When switching, the enumerator of the previous source needs to
> supply information about consumed splits that allows to set the start
> position for the next source. That could be something like the last
> processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> doesn't track finished splits.)
>
> See previous discussion regarding start/end position. The prototype shows
> the use of checkpoint state with converter function.
>
> * Should readers be deployed dynamically?
>
> The prototype assumes a static source chain that is fixed at job submission
> time. Conceivably there could be use cases that require more flexibility.
> Such as switching one KafkaSource for another. A step in that direction
> would be to deploy the actual readers dynamically, at the time of switching
> source.
>
> Looking forward to feedback and suggestions for next steps!
>
> Thomas
>
> On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <
[hidden email]> wrote:
>
> > Hi Nicholas,
> >
> > Thanks for the reply. I had implemented a small PoC. It switches a
> > configurable sequence of sources with predefined bounds. I'm using the
> > unmodified MockSource for illustration. It does not require a
> "Switchable"
> > interface. I looked at the code you shared and the delegation and
> signaling
> > works quite similar. That's a good validation.
> >
> > Hi Kezhu,
> >
> > Thanks for bringing the more detailed discussion regarding the start/end
> > position. I think in most cases the start and end positions will be known
> > when the job is submitted. If we take a File -> Kafka source chain as
> > example, there would most likely be a timestamp at which we want to
> > transition from files to reading from Kafka. So we would either set the
> > start position for Kafka based on that timestamp or provide the offsets
> > directly. (Note that I'm skipping a few related nuances here. In order to
> > achieve an exact switch without duplication or gap, we may also need some
> > overlap and filtering, but that's a separate issue.)
> >
> > The point is that the start positions can be configured by the user,
> there
> > is no need to transfer any information from one source to another as part
> > of switching.
> >
> > It gets more complicated if we want to achieve a dynamic switch where the
> > transition timestamp isn't known when the job starts. For example,
> consider
> > a bootstrap scenario where the time taken to process historic data
> exceeds
> > the Kafka retention. Here, we would need to dynamically resolve the Kafka
> > start position based on where the file readers left off, when the
> switching
> > occurs. The file source enumerator would determine at runtime when it is
> > done handing splits to its readers, maybe when the max file timestamp
> > reaches (processing time - X). This information needs to be transferred
> to
> > the Kafka source.
> >
> > The timestamp would need to be derived from the file enumerator state,
> > either by looking at the last splits or explicitly. The natural way to do
> > that is to introspect the enumerator state which gets checkpointed. Any
> > other form of "end position" via a special interface would need to be
> > derived in the same manner.
> >
> > The converter that will be provided by the user would look at the file
> > enumerator state, derive the timestamp and then supply the "start
> position"
> > to the Kafka source. The Kafka source was created when the job started.
> It
> > needs to be augmented with the new start position. That can be achieved
> via
> > a special enumerator interface like
> SwitchableSplitEnumerator#setStartState
> > or by using restoreEnumerator with the checkpoint state constructed by
> the
> > converter function. I'm leaning towards the latter as long as there is a
> > convenient way to construct the state from a position (like
> > enumStateForTimestamp). The converter would map one enum state to another
> > and can be made very simple by providing a few utility functions instead
> of
> > mandating a new interface that enumerators need to implement to become
> > switchable.
> >
> > Again, a converter is only required when sources need to be switched
> based
> > on positions not known at graph construction time.
> >
> > I'm planning to add such deferred switching to the PoC for illustration
> > and will share the experiment when that's done.
> >
> > Cheers,
> > Thomas
> >
> >
> > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <
[hidden email]>
> wrote:
> >
> >> Hi Kezhu,
> >>
> >> Thanks for your detailed points for the Hybrid Source. I follow your
> >> opinions and make a corresponding explanation as follows:
> >>
> >> 1.Would the Hybrid Source be possible to use this feature to
> switch/chain
> >> multiple homogeneous sources?
> >>
> >> "HybridSource" supports to switch/chain multiple homogeneous sources,
> >> which
> >> have the respective implementation for "SwitchableSource" and
> >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
> >> Sources consisted is homogeneous. From the user's perspective, User only
> >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
> >> migration operation to "HybridSource".
> >>
> >> 2."setStartState" is actually a reposition operation for next source to
> >> start in job runtime?
> >>
> >> IMO, "setStartState" is used to determine the initial position of the
> new
> >> source for smooth migration, not reposition operation. More importantly,
> >> the
> >> "State" mentioned here refers to the start and end positions of reading
> >> source.
> >>
> >> 3.This conversion should be implementation detail of next source, not
> >> converter function in my opinion?
> >>
> >> The state conversion is of course an implementation detail and included
> in
> >> the switching mechanism, that should provide users with the conversion
> >> interface for conversion, which is defined in converter function. What's
> >> more, when users has already implemented "SwitchableSource" and added to
> >> the
> >> Hybrid Source, the users don't need to implement the "SwitchableSource"
> >> for
> >> the different conversion. From the user's perspective, users could
> define
> >> the different converter functions and create the "SwitchableSource" for
> >> the
> >> addition of "HybridSource", no need to implement a Source for the
> >> converter
> >> function.
> >>
> >> 4.No configurable start-position. In this situation combination of above
> >> three joints is a nop, and
> >> "HybridSource" is a chain of start-position pre-configured sources?
> >>
> >> Indeed there is no configurable start-position, and this configuration
> >> could
> >> be involved in the feature. Users could use
> >> "SwitchableSplitEnumerator#setStartState" interface or the configuration
> >> parameters to configure start-position.
> >>
> >> 5.I am wonder whether end-position is a must and how it could be useful
> >> for
> >> end users in a generic-enough source?
> >>
> >> "getEndState" interface is used for the smooth migration scenario, which
> >> could return null value if it is not needed. In the Hybrid Source
> >> mechanism,
> >> this interface is required for the switching between the sources
> >> consisted,
> >> otherwise there is no any way to get end-position of upstream source. In
> >> summary, Hybrid Source needs to be able to set the start position and
> get
> >> the end position of each Source, otherwise there is no use to build
> Hybrid
> >> Source.
> >>
> >> 6.Is it possible for converter function to do blocking operations? How
> to
> >> respond to checkpoint request when switching split enumerators cross
> >> sources? Does end-position or start-position need to be stored in
> >> checkpoint
> >> state or not?
> >>
> >> The converter function only simply converts the state of upstream source
> >> to
> >> the state of downstream source, not blocking operations. The way to
> >> respond
> >> the checkpoint request when switching split enumerators cross sources is
> >> send the corresponding "SourceEvent" to coordination. The end-position
> or
> >> start-position don't need to be stored in checkpoint state, only
> >> implements
> >> the "getEndState" interface for end-position.
> >>
> >> Best,
> >> Nicholas Jiang
> >>
> >>
> >>
> >> --
> >> Sent from:
> >>
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/> >>
> >
>