http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-150-Introduce-Hybrid-Source-tp46271p49703.html
position for the next source. That could be something like the last
processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
See previous discussion regarding start/end position. The prototype shows
time. Conceivably there could be use cases that require more flexibility.
Such as switching one KafkaSource for another. A step in that direction
> Hi Nicholas,
>
> Thanks for the reply. I had implemented a small PoC. It switches a
> configurable sequence of sources with predefined bounds. I'm using the
> unmodified MockSource for illustration. It does not require a "Switchable"
> interface. I looked at the code you shared and the delegation and signaling
> works quite similar. That's a good validation.
>
> Hi Kezhu,
>
> Thanks for bringing the more detailed discussion regarding the start/end
> position. I think in most cases the start and end positions will be known
> when the job is submitted. If we take a File -> Kafka source chain as
> example, there would most likely be a timestamp at which we want to
> transition from files to reading from Kafka. So we would either set the
> start position for Kafka based on that timestamp or provide the offsets
> directly. (Note that I'm skipping a few related nuances here. In order to
> achieve an exact switch without duplication or gap, we may also need some
> overlap and filtering, but that's a separate issue.)
>
> The point is that the start positions can be configured by the user, there
> is no need to transfer any information from one source to another as part
> of switching.
>
> It gets more complicated if we want to achieve a dynamic switch where the
> transition timestamp isn't known when the job starts. For example, consider
> a bootstrap scenario where the time taken to process historic data exceeds
> the Kafka retention. Here, we would need to dynamically resolve the Kafka
> start position based on where the file readers left off, when the switching
> occurs. The file source enumerator would determine at runtime when it is
> done handing splits to its readers, maybe when the max file timestamp
> reaches (processing time - X). This information needs to be transferred to
> the Kafka source.
>
> The timestamp would need to be derived from the file enumerator state,
> either by looking at the last splits or explicitly. The natural way to do
> that is to introspect the enumerator state which gets checkpointed. Any
> other form of "end position" via a special interface would need to be
> derived in the same manner.
>
> The converter that will be provided by the user would look at the file
> enumerator state, derive the timestamp and then supply the "start position"
> to the Kafka source. The Kafka source was created when the job started. It
> needs to be augmented with the new start position. That can be achieved via
> a special enumerator interface like SwitchableSplitEnumerator#setStartState
> or by using restoreEnumerator with the checkpoint state constructed by the
> converter function. I'm leaning towards the latter as long as there is a
> convenient way to construct the state from a position (like
> enumStateForTimestamp). The converter would map one enum state to another
> and can be made very simple by providing a few utility functions instead of
> mandating a new interface that enumerators need to implement to become
> switchable.
>
> Again, a converter is only required when sources need to be switched based
> on positions not known at graph construction time.
>
> I'm planning to add such deferred switching to the PoC for illustration
> and will share the experiment when that's done.
>
> Cheers,
> Thomas
>
>
> On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <
[hidden email]> wrote:
>
>> Hi Kezhu,
>>
>> Thanks for your detailed points for the Hybrid Source. I follow your
>> opinions and make a corresponding explanation as follows:
>>
>> 1.Would the Hybrid Source be possible to use this feature to switch/chain
>> multiple homogeneous sources?
>>
>> "HybridSource" supports to switch/chain multiple homogeneous sources,
>> which
>> have the respective implementation for "SwitchableSource" and
>> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the
>> Sources consisted is homogeneous. From the user's perspective, User only
>> adds the "SwitchableSource" into "HybridSource" and leaves the smooth
>> migration operation to "HybridSource".
>>
>> 2."setStartState" is actually a reposition operation for next source to
>> start in job runtime?
>>
>> IMO, "setStartState" is used to determine the initial position of the new
>> source for smooth migration, not reposition operation. More importantly,
>> the
>> "State" mentioned here refers to the start and end positions of reading
>> source.
>>
>> 3.This conversion should be implementation detail of next source, not
>> converter function in my opinion?
>>
>> The state conversion is of course an implementation detail and included in
>> the switching mechanism, that should provide users with the conversion
>> interface for conversion, which is defined in converter function. What's
>> more, when users has already implemented "SwitchableSource" and added to
>> the
>> Hybrid Source, the users don't need to implement the "SwitchableSource"
>> for
>> the different conversion. From the user's perspective, users could define
>> the different converter functions and create the "SwitchableSource" for
>> the
>> addition of "HybridSource", no need to implement a Source for the
>> converter
>> function.
>>
>> 4.No configurable start-position. In this situation combination of above
>> three joints is a nop, and
>> "HybridSource" is a chain of start-position pre-configured sources?
>>
>> Indeed there is no configurable start-position, and this configuration
>> could
>> be involved in the feature. Users could use
>> "SwitchableSplitEnumerator#setStartState" interface or the configuration
>> parameters to configure start-position.
>>
>> 5.I am wonder whether end-position is a must and how it could be useful
>> for
>> end users in a generic-enough source?
>>
>> "getEndState" interface is used for the smooth migration scenario, which
>> could return null value if it is not needed. In the Hybrid Source
>> mechanism,
>> this interface is required for the switching between the sources
>> consisted,
>> otherwise there is no any way to get end-position of upstream source. In
>> summary, Hybrid Source needs to be able to set the start position and get
>> the end position of each Source, otherwise there is no use to build Hybrid
>> Source.
>>
>> 6.Is it possible for converter function to do blocking operations? How to
>> respond to checkpoint request when switching split enumerators cross
>> sources? Does end-position or start-position need to be stored in
>> checkpoint
>> state or not?
>>
>> The converter function only simply converts the state of upstream source
>> to
>> the state of downstream source, not blocking operations. The way to
>> respond
>> the checkpoint request when switching split enumerators cross sources is
>> send the corresponding "SourceEvent" to coordination. The end-position or
>> start-position don't need to be stored in checkpoint state, only
>> implements
>> the "getEndState" interface for end-position.
>>
>> Best,
>> Nicholas Jiang
>>
>>
>>
>> --
>> Sent from:
>>
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/>>
>