This post was updated on .
Hi devs,
I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid source is a source that contains a list of concrete sources. The hybrid source reads from each contained source in the defined order. It switches from source A to the next source B when source A finishes. In practice, many Flink jobs need to read data from multiple sources in sequential order. Change Data Capture (CDC) and machine learning feature backfill are two concrete scenarios of this consumption pattern. Users may have to either run two different Flink jobs or have some hacks in the SourceFunction to address such use cases. To support above scenarios smoothly, the Flink jobs need to first read from HDFS for historical data then switch to Kafka for real-time records. The hybrid source has several benefits from the user's perspective: - Switching among multiple sources is easy based on the switchable source implementations of different connectors. - This supports to automatically switching for user-defined switchable source that constitutes hybrid source. - There is complete and effective mechanism to support smooth source migration between historical and real-time data. Therefore, in this discussion, we propose to introduce a “Hybrid Source” API built on top of the new Source API (FLIP-27) to help users to smoothly switch sources. For more detail, please refer to the FLIP design doc[1]. I'm looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source Best, Nicholas Jiang |
Hi Nicholas,
Thanks for starting the discussion! I think we might be able to simplify this a bit and re-use existing functionality. There is already `Source.restoreEnumerator()` and `SplitEnumerator.snapshotState(). This seems to be roughly what the Hybrid Source needs. When the initial source finishes, we can take a snapshot (which should include data that the follow-up sources need for initialization). Then we need a function that maps the enumerator checkpoint types between initial source and new source and we are good to go. We wouldn't need to introduce any additional interfaces for sources to implement, which would fragment the ecosystem between sources that can be used in a Hybrid Source and sources that cannot be used in a Hybrid Source. What do you think? Best, Aljoscha On 2020/11/03 02:34, Nicholas Jiang wrote: >Hi devs, > >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid >source is a source that contains a list of concrete sources. The hybrid >source reads from each contained source in the defined order. It switches >from source A to the next source B when source A finishes. > >In practice, many Flink jobs need to read data from multiple sources in >sequential order. Change Data Capture (CDC) and machine learning feature >backfill are two concrete scenarios of this consumption pattern. Users may >have to either run two different Flink jobs or have some hacks in the >SourceFunction to address such use cases. > >To support above scenarios smoothly, the Flink jobs need to first read from >HDFS for historical data then switch to Kafka for real-time records. The >hybrid source has several benefits from the user's perspective: > >- Switching among multiple sources is easy based on the switchable source >implementations of different connectors. >- This supports to automatically switching for user-defined switchable >source that constitutes hybrid source. >- There is complete and effective mechanism to support smooth source >migration between historical and real-time data. > >Therefore, in this discussion, we propose to introduce a “Hybrid Source” API >built on top of the new Source API (FLIP-27) to help users to smoothly >switch sources. For more detail, please refer to the FLIP design doc[1]. > >I'm looking forward to your feedback. > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source ><https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source> > >Best, >Nicholas Jiang > > > >-- >Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Thanks for initiating this discussion and creating the proposal!
I would like to contribute to this effort. Has there been related activity since the FLIP was created? If not, I would like to start work on a PoC to validate the design. Questions/comments: There could be more use cases for a hybrid source beyond predefined sequence that is fixed at job submission time. For example, the source connector could be used to migrate from one external system to another (like Kafka1 .. KafkaN - based on external trigger/discovery). I agree with @Aljoscha Krettek <[hidden email]> that it would be preferable to solve this without special "switchable" interfaces and have it work with any FLIP-27 source as is. Performing the switch using the enumerator checkpoint appears viable (not proven though unless coded 😉). The actual FLIP-27 source reader would need to signal to the "HybridSourceReader" (HSR) that they are done and then the HSR would send the switch event to the coordinator? To further confirm my understanding: The actual split type that flows between enumerator and reader would be "HybridSourceSplit" and it would wrap the specific split (in the example either HDFS or Kafka)? Switching relies on the previous source's end position to be communicated as start position to the next source. The position(s) can be exchanged through the checkpoint state, but "HybridSplitEnumerator" still needs a way to extract them from the actual enumerator. That could be done by the enumerator checkpoint state mapping function looking at the current split assignments, which would not require modification of existing enumerators? Cheers, Thomas On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <[hidden email]> wrote: > Hi Nicholas, > > Thanks for starting the discussion! > > I think we might be able to simplify this a bit and re-use existing > functionality. > > There is already `Source.restoreEnumerator()` and > `SplitEnumerator.snapshotState(). This seems to be roughly what the > Hybrid Source needs. When the initial source finishes, we can take a > snapshot (which should include data that the follow-up sources need for > initialization). Then we need a function that maps the enumerator > checkpoint types between initial source and new source and we are good > to go. We wouldn't need to introduce any additional interfaces for > sources to implement, which would fragment the ecosystem between sources > that can be used in a Hybrid Source and sources that cannot be used in a > Hybrid Source. > > What do you think? > > Best, > Aljoscha > > On 2020/11/03 02:34, Nicholas Jiang wrote: > >Hi devs, > > > >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid > >source is a source that contains a list of concrete sources. The hybrid > >source reads from each contained source in the defined order. It switches > >from source A to the next source B when source A finishes. > > > >In practice, many Flink jobs need to read data from multiple sources in > >sequential order. Change Data Capture (CDC) and machine learning feature > >backfill are two concrete scenarios of this consumption pattern. Users may > >have to either run two different Flink jobs or have some hacks in the > >SourceFunction to address such use cases. > > > >To support above scenarios smoothly, the Flink jobs need to first read > from > >HDFS for historical data then switch to Kafka for real-time records. The > >hybrid source has several benefits from the user's perspective: > > > >- Switching among multiple sources is easy based on the switchable source > >implementations of different connectors. > >- This supports to automatically switching for user-defined switchable > >source that constitutes hybrid source. > >- There is complete and effective mechanism to support smooth source > >migration between historical and real-time data. > > > >Therefore, in this discussion, we propose to introduce a “Hybrid Source” > API > >built on top of the new Source API (FLIP-27) to help users to smoothly > >switch sources. For more detail, please refer to the FLIP design doc[1]. > > > >I'm looking forward to your feedback. > > > >[1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > >< > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > > > > > >Best, > >Nicholas Jiang > > > > > > > >-- > >Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Hi all,
Thanks for starting the discussion! I think there are differences between start-position and checkpoint-state. Checkpoint states are actually intermediate progress-capture in reading data. Source clients don't care about it in coding phase, there are purely implementation details. While start-position, on the other side, is part of public API, say, paths for FileSource/HiveSource, OffsetsInitializer for KafkaSource. Given that, `setStartState` is actually a reposition operation for next source to start in job runtime. Besides above, I think this separation of start-position and checkpoint-state leads to smooth migration from no-switchable source to switchable source and relaxes source writers. Source writers will not have to take into account of switchable-or-not in design phase, but a postponable decision. Later enhancement will not break anything. I would be relative hard to convert start-position to checkpoint-state in restoring without changing to checkpoint-state structure and/or serializer: * `Paths[]` to `PendingSplitsCheckpoint<FileSourceSplit>`. * `OffsetsInitializer` to `KafkaSourceEnumState`. This conversion should be implementation detail of next source, not converter function in my opinion. With separated start-position concept, I think the `StartStateT` in proposal should be `Path[]` for `FileSource`, `OffsetsInitializer` for `KafkaSource`. Seems like it should belong to source more than split enumerator. If we do not reuse checkpoint-state as start-position, then this feature requires supports from Flink side(that says it is awkward/tangled to be used as a third party libraries): * It needs a post-mortem position to delivery possible useful information for next source. * It needs to reposition next source, such as `FileSource`, `KafkaSource`, etc. Back to the proposal, I think it shows three joints: 1. Separated start-position for next source. 2. Separated end-position from preceding source. 3. Converter from end-position to start-position. I think some or all of the three should be optional. Let me detail: 1. No configurable start-position. In this situation combination of above three is a nop, and `HybridSource` is just a chain of start-position pre-configured sources. Current design seems overkilling for this use case. This case could also be realized with help from `InputSelectable` and `MultipleInputStreamOperator`, but I think it fits this feature quite well. 2. I am wonder whether end-position is a must and how it could be useful for end users in a generic-enough source, say `FileSource` ? I could imagine that some sources may have embedded next-position for next source to start. For most generic sources, they actually have no meaningful next-position for other sources. Then would it be too risky to coin this to generic sources' type ? Or we are actually doing this solely to fit type requirement of `HybridSource.addSource` ? 3. Is it possible for converter function to do blocking operations ? How to respond to checkpoint request when switching split enumerators cross sources ? Does end-position or start-position need to be stored in checkpoint state or not ? Last, for the name `HybridSource`, would it be possible to use this feature to switch/chain multiple homogeneous sources ? Say: * Two file sources with different formats. * Migrate from one kafka cluster to another as @Thomas has already pointed out. On February 4, 2021 at 10:48:21, Thomas Weise ([hidden email]) wrote: Thanks for initiating this discussion and creating the proposal! I would like to contribute to this effort. Has there been related activity since the FLIP was created? If not, I would like to start work on a PoC to validate the design. Questions/comments: There could be more use cases for a hybrid source beyond predefined sequence that is fixed at job submission time. For example, the source connector could be used to migrate from one external system to another (like Kafka1 .. KafkaN - based on external trigger/discovery). I agree with @Aljoscha Krettek <[hidden email]> that it would be preferable to solve this without special "switchable" interfaces and have it work with any FLIP-27 source as is. Performing the switch using the enumerator checkpoint appears viable (not proven though unless coded 😉). The actual FLIP-27 source reader would need to signal to the "HybridSourceReader" (HSR) that they are done and then the HSR would send the switch event to the coordinator? To further confirm my understanding: The actual split type that flows between enumerator and reader would be "HybridSourceSplit" and it would wrap the specific split (in the example either HDFS or Kafka)? Switching relies on the previous source's end position to be communicated as start position to the next source. The position(s) can be exchanged through the checkpoint state, but "HybridSplitEnumerator" still needs a way to extract them from the actual enumerator. That could be done by the enumerator checkpoint state mapping function looking at the current split assignments, which would not require modification of existing enumerators? Cheers, Thomas On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <[hidden email]> wrote: > Hi Nicholas, > > Thanks for starting the discussion! > > I think we might be able to simplify this a bit and re-use existing > functionality. > > There is already `Source.restoreEnumerator()` and > `SplitEnumerator.snapshotState(). This seems to be roughly what the > Hybrid Source needs. When the initial source finishes, we can take a > snapshot (which should include data that the follow-up sources need for > initialization). Then we need a function that maps the enumerator > checkpoint types between initial source and new source and we are good > to go. We wouldn't need to introduce any additional interfaces for > sources to implement, which would fragment the ecosystem between sources > that can be used in a Hybrid Source and sources that cannot be used in a > Hybrid Source. > > What do you think? > > Best, > Aljoscha > > On 2020/11/03 02:34, Nicholas Jiang wrote: > >Hi devs, > > > >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid > >source is a source that contains a list of concrete sources. The hybrid > >source reads from each contained source in the defined order. It > >from source A to the next source B when source A finishes. > > > >In practice, many Flink jobs need to read data from multiple sources in > >sequential order. Change Data Capture (CDC) and machine learning feature > >backfill are two concrete scenarios of this consumption pattern. Users may > >have to either run two different Flink jobs or have some hacks in the > >SourceFunction to address such use cases. > > > >To support above scenarios smoothly, the Flink jobs need to first read > from > >HDFS for historical data then switch to Kafka for real-time records. The > >hybrid source has several benefits from the user's perspective: > > > >- Switching among multiple sources is easy based on the switchable source > >implementations of different connectors. > >- This supports to automatically switching for user-defined switchable > >source that constitutes hybrid source. > >- There is complete and effective mechanism to support smooth source > >migration between historical and real-time data. > > > >Therefore, in this discussion, we propose to introduce a “Hybrid Source” > API > >built on top of the new Source API (FLIP-27) to help users to smoothly > >switch sources. For more detail, please refer to the FLIP design doc[1]. > > > >I'm looking forward to your feedback. > > > >[1] > > > > >< > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > > > > > >Best, > >Nicholas Jiang > > > > > > > >-- > >Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
This post was updated on .
In reply to this post by Thomas Weise
Hi Thomas,
Thanks for your detailed reply for the design of Hybrid Source. I would reply to the questions you mentioned as follows: 1.Has there been related activity since the FLIP was created? Yes, the FLIP has the initial version of Hybrid Source implementation. You'd better refer to the repository: git@github.com:wuchong/flink-hackathon.git . 2.The actual FLIP-27 source reader would need to signal to the "HybridSourceReader" (HSR) that they are done and then the HSR would send the switch event to the coordinator? The communication in Hybrid Source should be between "HybridSourceReader" and "HybridSplitEnumerator". After a "SourceReader" in "HybridSourceReader" finishes reading the source split assigned from split enumerator and switches to another "SourceReader", "HybridSourceReader" send a signal to "HybridSplitEnumerator" to notify that the "SourceReader" finishes reading. Then "HybridSplitEnumerator" receives the "SourceReaderSwitchedEvent" signal and switch to another "SplitEnumerator". 3.The actual split type that flows between enumerator and reader would be "HybridSourceSplit" and it would wrap the specific split (in the example either HDFS or Kafka)? "HybridSourceSplit" wraps the source split types for the "SourceSplit" created by the "SplitEnumerator" in "HybridSplitEnumerator". 4."HybridSplitEnumerator" still needs a way to extract them from the actual enumerator. That could be done by the enumerator checkpoint state mapping function looking at the current split assignments, which would not require modification of existing enumerators? IMO, the above way to extract them from the actual enumerator could be defined in the state conversion function which is invoked in "HybridSplitEnumerator" when switch one "SwitchableSplitEnumerator" to another "SwitchableSplitEnumerator". Because the "SwitchableSplitEnumerator" interface could return the end state of the actual enumerator. I don't know if it has answered your confusion for Hybrid Source. If you still have questions, keep discussion with me. Thanks for your attention. Best, Nicholas Jiang -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
This post was updated on .
In reply to this post by Kezhu Wang
Hi Kezhu,
Thanks for your detailed points for the Hybrid Source. I follow your opinions and make a corresponding explanation as follows: 1.Would the Hybrid Source be possible to use this feature to switch/chain multiple homogeneous sources? "HybridSource" supports to switch/chain multiple homogeneous sources, which have the respective implementation for "SwitchableSource" and "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the Sources consisted is homogeneous. From the user's perspective, User only adds the "SwitchableSource" into "HybridSource" and leaves the smooth migration operation to "HybridSource". 2."setStartState" is actually a reposition operation for next source to start in job runtime? IMO, "setStartState" is used to determine the initial position of the new source for smooth migration, not reposition operation. More importantly, the "State" mentioned here refers to the start and end positions of reading source. 3.This conversion should be implementation detail of next source, not converter function in my opinion? The state conversion is of course an implementation detail and included in the switching mechanism, that should provide users with the conversion interface for conversion, which is defined in converter function. What's more, when users has already implemented "SwitchableSource" and added to the Hybrid Source, the users don't need to implement the "SwitchableSource" for the different conversion. From the user's perspective, users could define the different converter functions and create the "SwitchableSource" for the addition of "HybridSource", no need to implement a Source for the converter function. 4.No configurable start-position. In this situation combination of above three joints is a nop, and "HybridSource" is a chain of start-position pre-configured sources? Indeed there is no configurable start-position, and this configuration could be involved in the feature. Users could use "SwitchableSplitEnumerator#setStartState" interface or the configuration parameters to configure start-position. 5.I am wonder whether end-position is a must and how it could be useful for end users in a generic-enough source? "getEndState" interface is used for the smooth migration scenario, which could return null value if it is not needed. In the Hybrid Source mechanism, this interface is required for the switching between the sources consisted, otherwise there is no any way to get end-position of upstream source. In summary, Hybrid Source needs to be able to set the start position and get the end position of each Source, otherwise there is no use to build Hybrid Source. 6.Is it possible for converter function to do blocking operations? How to respond to checkpoint request when switching split enumerators cross sources? Does end-position or start-position need to be stored in checkpoint state or not? The converter function only simply converts the state of upstream source to the state of downstream source, not blocking operations. The way to respond the checkpoint request when switching split enumerators cross sources is send the corresponding "SourceEvent" to coordination. The end-position or start-position don't need to be stored in checkpoint state, only implements the "getEndState" interface for end-position. Best, Nicholas Jiang -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi Nicholas,
Thanks for the reply. I had implemented a small PoC. It switches a configurable sequence of sources with predefined bounds. I'm using the unmodified MockSource for illustration. It does not require a "Switchable" interface. I looked at the code you shared and the delegation and signaling works quite similar. That's a good validation. Hi Kezhu, Thanks for bringing the more detailed discussion regarding the start/end position. I think in most cases the start and end positions will be known when the job is submitted. If we take a File -> Kafka source chain as example, there would most likely be a timestamp at which we want to transition from files to reading from Kafka. So we would either set the start position for Kafka based on that timestamp or provide the offsets directly. (Note that I'm skipping a few related nuances here. In order to achieve an exact switch without duplication or gap, we may also need some overlap and filtering, but that's a separate issue.) The point is that the start positions can be configured by the user, there is no need to transfer any information from one source to another as part of switching. It gets more complicated if we want to achieve a dynamic switch where the transition timestamp isn't known when the job starts. For example, consider a bootstrap scenario where the time taken to process historic data exceeds the Kafka retention. Here, we would need to dynamically resolve the Kafka start position based on where the file readers left off, when the switching occurs. The file source enumerator would determine at runtime when it is done handing splits to its readers, maybe when the max file timestamp reaches (processing time - X). This information needs to be transferred to the Kafka source. The timestamp would need to be derived from the file enumerator state, either by looking at the last splits or explicitly. The natural way to do that is to introspect the enumerator state which gets checkpointed. Any other form of "end position" via a special interface would need to be derived in the same manner. The converter that will be provided by the user would look at the file enumerator state, derive the timestamp and then supply the "start position" to the Kafka source. The Kafka source was created when the job started. It needs to be augmented with the new start position. That can be achieved via a special enumerator interface like SwitchableSplitEnumerator#setStartState or by using restoreEnumerator with the checkpoint state constructed by the converter function. I'm leaning towards the latter as long as there is a convenient way to construct the state from a position (like enumStateForTimestamp). The converter would map one enum state to another and can be made very simple by providing a few utility functions instead of mandating a new interface that enumerators need to implement to become switchable. Again, a converter is only required when sources need to be switched based on positions not known at graph construction time. I'm planning to add such deferred switching to the PoC for illustration and will share the experiment when that's done. Cheers, Thomas On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> wrote: > Hi Kezhu, > > Thanks for your detailed points for the Hybrid Source. I follow your > opinions and make a corresponding explanation as follows: > > 1.Would the Hybrid Source be possible to use this feature to switch/chain > multiple homogeneous sources? > > "HybridSource" supports to switch/chain multiple homogeneous sources, which > have the respective implementation for "SwitchableSource" and > "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the > Sources consisted is homogeneous. From the user's perspective, User only > adds the "SwitchableSource" into "HybridSource" and leaves the smooth > migration operation to "HybridSource". > > 2."setStartState" is actually a reposition operation for next source to > start in job runtime? > > IMO, "setStartState" is used to determine the initial position of the new > source for smooth migration, not reposition operation. More importantly, > the > "State" mentioned here refers to the start and end positions of reading > source. > > 3.This conversion should be implementation detail of next source, not > converter function in my opinion? > > The state conversion is of course an implementation detail and included in > the switching mechanism, that should provide users with the conversion > interface for conversion, which is defined in converter function. What's > more, when users has already implemented "SwitchableSource" and added to > the > Hybrid Source, the users don't need to implement the "SwitchableSource" for > the different conversion. From the user's perspective, users could define > the different converter functions and create the "SwitchableSource" for the > addition of "HybridSource", no need to implement a Source for the converter > function. > > 4.No configurable start-position. In this situation combination of above > three joints is a nop, and > "HybridSource" is a chain of start-position pre-configured sources? > > Indeed there is no configurable start-position, and this configuration > could > be involved in the feature. Users could use > "SwitchableSplitEnumerator#setStartState" interface or the configuration > parameters to configure start-position. > > 5.I am wonder whether end-position is a must and how it could be useful for > end users in a generic-enough source? > > "getEndState" interface is used for the smooth migration scenario, which > could return null value if it is not needed. In the Hybrid Source > mechanism, > this interface is required for the switching between the sources consisted, > otherwise there is no any way to get end-position of upstream source. In > summary, Hybrid Source needs to be able to set the start position and get > the end position of each Source, otherwise there is no use to build Hybrid > Source. > > 6.Is it possible for converter function to do blocking operations? How to > respond to checkpoint request when switching split enumerators cross > sources? Does end-position or start-position need to be stored in > checkpoint > state or not? > > The converter function only simply converts the state of upstream source to > the state of downstream source, not blocking operations. The way to respond > the checkpoint request when switching split enumerators cross sources is > send the corresponding "SourceEvent" to coordination. The end-position or > start-position don't need to be stored in checkpoint state, only implements > the "getEndState" interface for end-position. > > Best, > Nicholas Jiang > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Hi,
As mentioned in my previous email, I had been working on a prototype for the hybrid source. You can find it at https://github.com/tweise/flink/pull/1 It contains: * Switching with configurable chain of sources * Fixed or dynamic start positions * Test with MockSource and FileSource The purpose of the above PR is to gather feedback and help drive consensus on the FLIP. * How to support a dynamic start position within the source chain? Relevant in those (few?) cases where start positions are not known upfront. You can find an example of what that might look like in the tests: https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 When switching, the enumerator of the previous source needs to supply information about consumed splits that allows to set the start position for the next source. That could be something like the last processed file, timestamp, etc. (Currently StaticFileSplitEnumerator doesn't track finished splits.) See previous discussion regarding start/end position. The prototype shows the use of checkpoint state with converter function. * Should readers be deployed dynamically? The prototype assumes a static source chain that is fixed at job submission time. Conceivably there could be use cases that require more flexibility. Such as switching one KafkaSource for another. A step in that direction would be to deploy the actual readers dynamically, at the time of switching source. Looking forward to feedback and suggestions for next steps! Thomas On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote: > Hi Nicholas, > > Thanks for the reply. I had implemented a small PoC. It switches a > configurable sequence of sources with predefined bounds. I'm using the > unmodified MockSource for illustration. It does not require a "Switchable" > interface. I looked at the code you shared and the delegation and signaling > works quite similar. That's a good validation. > > Hi Kezhu, > > Thanks for bringing the more detailed discussion regarding the start/end > position. I think in most cases the start and end positions will be known > when the job is submitted. If we take a File -> Kafka source chain as > example, there would most likely be a timestamp at which we want to > transition from files to reading from Kafka. So we would either set the > start position for Kafka based on that timestamp or provide the offsets > directly. (Note that I'm skipping a few related nuances here. In order to > achieve an exact switch without duplication or gap, we may also need some > overlap and filtering, but that's a separate issue.) > > The point is that the start positions can be configured by the user, there > is no need to transfer any information from one source to another as part > of switching. > > It gets more complicated if we want to achieve a dynamic switch where the > transition timestamp isn't known when the job starts. For example, consider > a bootstrap scenario where the time taken to process historic data exceeds > the Kafka retention. Here, we would need to dynamically resolve the Kafka > start position based on where the file readers left off, when the switching > occurs. The file source enumerator would determine at runtime when it is > done handing splits to its readers, maybe when the max file timestamp > reaches (processing time - X). This information needs to be transferred to > the Kafka source. > > The timestamp would need to be derived from the file enumerator state, > either by looking at the last splits or explicitly. The natural way to do > that is to introspect the enumerator state which gets checkpointed. Any > other form of "end position" via a special interface would need to be > derived in the same manner. > > The converter that will be provided by the user would look at the file > enumerator state, derive the timestamp and then supply the "start position" > to the Kafka source. The Kafka source was created when the job started. It > needs to be augmented with the new start position. That can be achieved via > a special enumerator interface like SwitchableSplitEnumerator#setStartState > or by using restoreEnumerator with the checkpoint state constructed by the > converter function. I'm leaning towards the latter as long as there is a > convenient way to construct the state from a position (like > enumStateForTimestamp). The converter would map one enum state to another > and can be made very simple by providing a few utility functions instead of > mandating a new interface that enumerators need to implement to become > switchable. > > Again, a converter is only required when sources need to be switched based > on positions not known at graph construction time. > > I'm planning to add such deferred switching to the PoC for illustration > and will share the experiment when that's done. > > Cheers, > Thomas > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> wrote: > >> Hi Kezhu, >> >> Thanks for your detailed points for the Hybrid Source. I follow your >> opinions and make a corresponding explanation as follows: >> >> 1.Would the Hybrid Source be possible to use this feature to switch/chain >> multiple homogeneous sources? >> >> "HybridSource" supports to switch/chain multiple homogeneous sources, >> which >> have the respective implementation for "SwitchableSource" and >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the >> Sources consisted is homogeneous. From the user's perspective, User only >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth >> migration operation to "HybridSource". >> >> 2."setStartState" is actually a reposition operation for next source to >> start in job runtime? >> >> IMO, "setStartState" is used to determine the initial position of the new >> source for smooth migration, not reposition operation. More importantly, >> the >> "State" mentioned here refers to the start and end positions of reading >> source. >> >> 3.This conversion should be implementation detail of next source, not >> converter function in my opinion? >> >> The state conversion is of course an implementation detail and included in >> the switching mechanism, that should provide users with the conversion >> interface for conversion, which is defined in converter function. What's >> more, when users has already implemented "SwitchableSource" and added to >> the >> Hybrid Source, the users don't need to implement the "SwitchableSource" >> for >> the different conversion. From the user's perspective, users could define >> the different converter functions and create the "SwitchableSource" for >> the >> addition of "HybridSource", no need to implement a Source for the >> converter >> function. >> >> 4.No configurable start-position. In this situation combination of above >> three joints is a nop, and >> "HybridSource" is a chain of start-position pre-configured sources? >> >> Indeed there is no configurable start-position, and this configuration >> could >> be involved in the feature. Users could use >> "SwitchableSplitEnumerator#setStartState" interface or the configuration >> parameters to configure start-position. >> >> 5.I am wonder whether end-position is a must and how it could be useful >> for >> end users in a generic-enough source? >> >> "getEndState" interface is used for the smooth migration scenario, which >> could return null value if it is not needed. In the Hybrid Source >> mechanism, >> this interface is required for the switching between the sources >> consisted, >> otherwise there is no any way to get end-position of upstream source. In >> summary, Hybrid Source needs to be able to set the start position and get >> the end position of each Source, otherwise there is no use to build Hybrid >> Source. >> >> 6.Is it possible for converter function to do blocking operations? How to >> respond to checkpoint request when switching split enumerators cross >> sources? Does end-position or start-position need to be stored in >> checkpoint >> state or not? >> >> The converter function only simply converts the state of upstream source >> to >> the state of downstream source, not blocking operations. The way to >> respond >> the checkpoint request when switching split enumerators cross sources is >> send the corresponding "SourceEvent" to coordination. The end-position or >> start-position don't need to be stored in checkpoint state, only >> implements >> the "getEndState" interface for end-position. >> >> Best, >> Nicholas Jiang >> >> >> >> -- >> Sent from: >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ >> > |
Thanks all for this discussion. Looks like there are lots of ideas and
folks that are eager to do things, so let's see how we can get this moving. My take on this is the following: There will probably not be one Hybrid source, but possibly multiple ones, because of different strategies/requirements. - One may be very simple, with switching points known up-front. Would be good to have this in a very simple implementation. - There may be one where the switch is dynamic and the readers need to report back where they left off. - There may be one that switches back and forth multiple times during a job, for example Kakfa going to DFS whenever it falls behind retention, in order to catch up again. This also seems hard to "design on paper"; I expect there are nuances in a production setup that affect some details of the design. So I'd feel most comfortable in adding a variant of the hybrid source to Flink that has been used already in a real use case (not necessarily in production, but maybe in a testing/staging environment, so it seems to meet all requirements). What do you think about the following approach? - If there is a tested PoC, let's try to get it contributed to Flink without trying to make it much more general. - When we see similar but a bit different requirements for another hybrid source, then let's try to evolve the contributed one. - If we see new requirements that are so different that they don't fit well with the existing hybrid source, then let us look at building a second hybrid source for those requirements. We need to make connector contributions in general more easy, and I think it is not a bad thing to end up with different approaches and see how these play out against each other when being used by users. For example switching with known boundaries, dynamic switching, back-and-forth-switching, etc. (I know some committers are planning to do some work on making connector contributions easier, with standardized testing frameworks, decoupled CI, etc.) Best, Stephan On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote: > Hi, > > As mentioned in my previous email, I had been working on a prototype for > the hybrid source. > > You can find it at https://github.com/tweise/flink/pull/1 > > It contains: > * Switching with configurable chain of sources > * Fixed or dynamic start positions > * Test with MockSource and FileSource > > The purpose of the above PR is to gather feedback and help drive consensus > on the FLIP. > > * How to support a dynamic start position within the source chain? > > Relevant in those (few?) cases where start positions are not known upfront. > You can find an example of what that might look like in the tests: > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > When switching, the enumerator of the previous source needs to > supply information about consumed splits that allows to set the start > position for the next source. That could be something like the last > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > doesn't track finished splits.) > > See previous discussion regarding start/end position. The prototype shows > the use of checkpoint state with converter function. > > * Should readers be deployed dynamically? > > The prototype assumes a static source chain that is fixed at job submission > time. Conceivably there could be use cases that require more flexibility. > Such as switching one KafkaSource for another. A step in that direction > would be to deploy the actual readers dynamically, at the time of switching > source. > > Looking forward to feedback and suggestions for next steps! > > Thomas > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote: > > > Hi Nicholas, > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > configurable sequence of sources with predefined bounds. I'm using the > > unmodified MockSource for illustration. It does not require a > "Switchable" > > interface. I looked at the code you shared and the delegation and > signaling > > works quite similar. That's a good validation. > > > > Hi Kezhu, > > > > Thanks for bringing the more detailed discussion regarding the start/end > > position. I think in most cases the start and end positions will be known > > when the job is submitted. If we take a File -> Kafka source chain as > > example, there would most likely be a timestamp at which we want to > > transition from files to reading from Kafka. So we would either set the > > start position for Kafka based on that timestamp or provide the offsets > > directly. (Note that I'm skipping a few related nuances here. In order to > > achieve an exact switch without duplication or gap, we may also need some > > overlap and filtering, but that's a separate issue.) > > > > The point is that the start positions can be configured by the user, > there > > is no need to transfer any information from one source to another as part > > of switching. > > > > It gets more complicated if we want to achieve a dynamic switch where the > > transition timestamp isn't known when the job starts. For example, > consider > > a bootstrap scenario where the time taken to process historic data > exceeds > > the Kafka retention. Here, we would need to dynamically resolve the Kafka > > start position based on where the file readers left off, when the > switching > > occurs. The file source enumerator would determine at runtime when it is > > done handing splits to its readers, maybe when the max file timestamp > > reaches (processing time - X). This information needs to be transferred > to > > the Kafka source. > > > > The timestamp would need to be derived from the file enumerator state, > > either by looking at the last splits or explicitly. The natural way to do > > that is to introspect the enumerator state which gets checkpointed. Any > > other form of "end position" via a special interface would need to be > > derived in the same manner. > > > > The converter that will be provided by the user would look at the file > > enumerator state, derive the timestamp and then supply the "start > position" > > to the Kafka source. The Kafka source was created when the job started. > It > > needs to be augmented with the new start position. That can be achieved > via > > a special enumerator interface like > SwitchableSplitEnumerator#setStartState > > or by using restoreEnumerator with the checkpoint state constructed by > the > > converter function. I'm leaning towards the latter as long as there is a > > convenient way to construct the state from a position (like > > enumStateForTimestamp). The converter would map one enum state to another > > and can be made very simple by providing a few utility functions instead > of > > mandating a new interface that enumerators need to implement to become > > switchable. > > > > Again, a converter is only required when sources need to be switched > based > > on positions not known at graph construction time. > > > > I'm planning to add such deferred switching to the PoC for illustration > > and will share the experiment when that's done. > > > > Cheers, > > Thomas > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> > wrote: > > > >> Hi Kezhu, > >> > >> Thanks for your detailed points for the Hybrid Source. I follow your > >> opinions and make a corresponding explanation as follows: > >> > >> 1.Would the Hybrid Source be possible to use this feature to > switch/chain > >> multiple homogeneous sources? > >> > >> "HybridSource" supports to switch/chain multiple homogeneous sources, > >> which > >> have the respective implementation for "SwitchableSource" and > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the > >> Sources consisted is homogeneous. From the user's perspective, User only > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth > >> migration operation to "HybridSource". > >> > >> 2."setStartState" is actually a reposition operation for next source to > >> start in job runtime? > >> > >> IMO, "setStartState" is used to determine the initial position of the > new > >> source for smooth migration, not reposition operation. More importantly, > >> the > >> "State" mentioned here refers to the start and end positions of reading > >> source. > >> > >> 3.This conversion should be implementation detail of next source, not > >> converter function in my opinion? > >> > >> The state conversion is of course an implementation detail and included > in > >> the switching mechanism, that should provide users with the conversion > >> interface for conversion, which is defined in converter function. What's > >> more, when users has already implemented "SwitchableSource" and added to > >> the > >> Hybrid Source, the users don't need to implement the "SwitchableSource" > >> for > >> the different conversion. From the user's perspective, users could > define > >> the different converter functions and create the "SwitchableSource" for > >> the > >> addition of "HybridSource", no need to implement a Source for the > >> converter > >> function. > >> > >> 4.No configurable start-position. In this situation combination of above > >> three joints is a nop, and > >> "HybridSource" is a chain of start-position pre-configured sources? > >> > >> Indeed there is no configurable start-position, and this configuration > >> could > >> be involved in the feature. Users could use > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration > >> parameters to configure start-position. > >> > >> 5.I am wonder whether end-position is a must and how it could be useful > >> for > >> end users in a generic-enough source? > >> > >> "getEndState" interface is used for the smooth migration scenario, which > >> could return null value if it is not needed. In the Hybrid Source > >> mechanism, > >> this interface is required for the switching between the sources > >> consisted, > >> otherwise there is no any way to get end-position of upstream source. In > >> summary, Hybrid Source needs to be able to set the start position and > get > >> the end position of each Source, otherwise there is no use to build > Hybrid > >> Source. > >> > >> 6.Is it possible for converter function to do blocking operations? How > to > >> respond to checkpoint request when switching split enumerators cross > >> sources? Does end-position or start-position need to be stored in > >> checkpoint > >> state or not? > >> > >> The converter function only simply converts the state of upstream source > >> to > >> the state of downstream source, not blocking operations. The way to > >> respond > >> the checkpoint request when switching split enumerators cross sources is > >> send the corresponding "SourceEvent" to coordination. The end-position > or > >> start-position don't need to be stored in checkpoint state, only > >> implements > >> the "getEndState" interface for end-position. > >> > >> Best, > >> Nicholas Jiang > >> > >> > >> > >> -- > >> Sent from: > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > >> > > > |
Hi Stephan,
Thanks for the feedback! I agree with the approach of starting with a simple implementation that can address a well understood, significant portion of use cases. I'm planning to continue work on the prototype that I had shared. There is production level usage waiting for it fairly soon. I expect to open a PR in the coming weeks. Thomas On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote: > > Thanks all for this discussion. Looks like there are lots of ideas and > folks that are eager to do things, so let's see how we can get this moving. > > My take on this is the following: > > There will probably not be one Hybrid source, but possibly multiple ones, > because of different strategies/requirements. > - One may be very simple, with switching points known up-front. Would > be good to have this in a very simple implementation. > - There may be one where the switch is dynamic and the readers need to > report back where they left off. > - There may be one that switches back and forth multiple times during a > job, for example Kakfa going to DFS whenever it falls behind retention, in > order to catch up again. > > This also seems hard to "design on paper"; I expect there are nuances in a > production setup that affect some details of the design. So I'd feel most > comfortable in adding a variant of the hybrid source to Flink that has been > used already in a real use case (not necessarily in production, but maybe > in a testing/staging environment, so it seems to meet all requirements). > > > What do you think about the following approach? > - If there is a tested PoC, let's try to get it contributed to Flink > without trying to make it much more general. > - When we see similar but a bit different requirements for another hybrid > source, then let's try to evolve the contributed one. > - If we see new requirements that are so different that they don't fit > well with the existing hybrid source, then let us look at building a second > hybrid source for those requirements. > > We need to make connector contributions in general more easy, and I think > it is not a bad thing to end up with different approaches and see how these > play out against each other when being used by users. For example switching > with known boundaries, dynamic switching, back-and-forth-switching, etc. > (I know some committers are planning to do some work on making > connector contributions easier, with standardized testing frameworks, > decoupled CI, etc.) > > Best, > Stephan > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote: > > > Hi, > > > > As mentioned in my previous email, I had been working on a prototype for > > the hybrid source. > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > It contains: > > * Switching with configurable chain of sources > > * Fixed or dynamic start positions > > * Test with MockSource and FileSource > > > > The purpose of the above PR is to gather feedback and help drive consensus > > on the FLIP. > > > > * How to support a dynamic start position within the source chain? > > > > Relevant in those (few?) cases where start positions are not known upfront. > > You can find an example of what that might look like in the tests: > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > When switching, the enumerator of the previous source needs to > > supply information about consumed splits that allows to set the start > > position for the next source. That could be something like the last > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > > doesn't track finished splits.) > > > > See previous discussion regarding start/end position. The prototype shows > > the use of checkpoint state with converter function. > > > > * Should readers be deployed dynamically? > > > > The prototype assumes a static source chain that is fixed at job submission > > time. Conceivably there could be use cases that require more flexibility. > > Such as switching one KafkaSource for another. A step in that direction > > would be to deploy the actual readers dynamically, at the time of switching > > source. > > > > Looking forward to feedback and suggestions for next steps! > > > > Thomas > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote: > > > > > Hi Nicholas, > > > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > > configurable sequence of sources with predefined bounds. I'm using the > > > unmodified MockSource for illustration. It does not require a > > "Switchable" > > > interface. I looked at the code you shared and the delegation and > > signaling > > > works quite similar. That's a good validation. > > > > > > Hi Kezhu, > > > > > > Thanks for bringing the more detailed discussion regarding the start/end > > > position. I think in most cases the start and end positions will be known > > > when the job is submitted. If we take a File -> Kafka source chain as > > > example, there would most likely be a timestamp at which we want to > > > transition from files to reading from Kafka. So we would either set the > > > start position for Kafka based on that timestamp or provide the offsets > > > directly. (Note that I'm skipping a few related nuances here. In order to > > > achieve an exact switch without duplication or gap, we may also need some > > > overlap and filtering, but that's a separate issue.) > > > > > > The point is that the start positions can be configured by the user, > > there > > > is no need to transfer any information from one source to another as part > > > of switching. > > > > > > It gets more complicated if we want to achieve a dynamic switch where the > > > transition timestamp isn't known when the job starts. For example, > > consider > > > a bootstrap scenario where the time taken to process historic data > > exceeds > > > the Kafka retention. Here, we would need to dynamically resolve the Kafka > > > start position based on where the file readers left off, when the > > switching > > > occurs. The file source enumerator would determine at runtime when it is > > > done handing splits to its readers, maybe when the max file timestamp > > > reaches (processing time - X). This information needs to be transferred > > to > > > the Kafka source. > > > > > > The timestamp would need to be derived from the file enumerator state, > > > either by looking at the last splits or explicitly. The natural way to do > > > that is to introspect the enumerator state which gets checkpointed. Any > > > other form of "end position" via a special interface would need to be > > > derived in the same manner. > > > > > > The converter that will be provided by the user would look at the file > > > enumerator state, derive the timestamp and then supply the "start > > position" > > > to the Kafka source. The Kafka source was created when the job started. > > It > > > needs to be augmented with the new start position. That can be achieved > > via > > > a special enumerator interface like > > SwitchableSplitEnumerator#setStartState > > > or by using restoreEnumerator with the checkpoint state constructed by > > the > > > converter function. I'm leaning towards the latter as long as there is a > > > convenient way to construct the state from a position (like > > > enumStateForTimestamp). The converter would map one enum state to another > > > and can be made very simple by providing a few utility functions instead > > of > > > mandating a new interface that enumerators need to implement to become > > > switchable. > > > > > > Again, a converter is only required when sources need to be switched > > based > > > on positions not known at graph construction time. > > > > > > I'm planning to add such deferred switching to the PoC for illustration > > > and will share the experiment when that's done. > > > > > > Cheers, > > > Thomas > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> > > wrote: > > > > > >> Hi Kezhu, > > >> > > >> Thanks for your detailed points for the Hybrid Source. I follow your > > >> opinions and make a corresponding explanation as follows: > > >> > > >> 1.Would the Hybrid Source be possible to use this feature to > > switch/chain > > >> multiple homogeneous sources? > > >> > > >> "HybridSource" supports to switch/chain multiple homogeneous sources, > > >> which > > >> have the respective implementation for "SwitchableSource" and > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether the > > >> Sources consisted is homogeneous. From the user's perspective, User only > > >> adds the "SwitchableSource" into "HybridSource" and leaves the smooth > > >> migration operation to "HybridSource". > > >> > > >> 2."setStartState" is actually a reposition operation for next source to > > >> start in job runtime? > > >> > > >> IMO, "setStartState" is used to determine the initial position of the > > new > > >> source for smooth migration, not reposition operation. More importantly, > > >> the > > >> "State" mentioned here refers to the start and end positions of reading > > >> source. > > >> > > >> 3.This conversion should be implementation detail of next source, not > > >> converter function in my opinion? > > >> > > >> The state conversion is of course an implementation detail and included > > in > > >> the switching mechanism, that should provide users with the conversion > > >> interface for conversion, which is defined in converter function. What's > > >> more, when users has already implemented "SwitchableSource" and added to > > >> the > > >> Hybrid Source, the users don't need to implement the "SwitchableSource" > > >> for > > >> the different conversion. From the user's perspective, users could > > define > > >> the different converter functions and create the "SwitchableSource" for > > >> the > > >> addition of "HybridSource", no need to implement a Source for the > > >> converter > > >> function. > > >> > > >> 4.No configurable start-position. In this situation combination of above > > >> three joints is a nop, and > > >> "HybridSource" is a chain of start-position pre-configured sources? > > >> > > >> Indeed there is no configurable start-position, and this configuration > > >> could > > >> be involved in the feature. Users could use > > >> "SwitchableSplitEnumerator#setStartState" interface or the configuration > > >> parameters to configure start-position. > > >> > > >> 5.I am wonder whether end-position is a must and how it could be useful > > >> for > > >> end users in a generic-enough source? > > >> > > >> "getEndState" interface is used for the smooth migration scenario, which > > >> could return null value if it is not needed. In the Hybrid Source > > >> mechanism, > > >> this interface is required for the switching between the sources > > >> consisted, > > >> otherwise there is no any way to get end-position of upstream source. In > > >> summary, Hybrid Source needs to be able to set the start position and > > get > > >> the end position of each Source, otherwise there is no use to build > > Hybrid > > >> Source. > > >> > > >> 6.Is it possible for converter function to do blocking operations? How > > to > > >> respond to checkpoint request when switching split enumerators cross > > >> sources? Does end-position or start-position need to be stored in > > >> checkpoint > > >> state or not? > > >> > > >> The converter function only simply converts the state of upstream source > > >> to > > >> the state of downstream source, not blocking operations. The way to > > >> respond > > >> the checkpoint request when switching split enumerators cross sources is > > >> send the corresponding "SourceEvent" to coordination. The end-position > > or > > >> start-position don't need to be stored in checkpoint state, only > > >> implements > > >> the "getEndState" interface for end-position. > > >> > > >> Best, > > >> Nicholas Jiang > > >> > > >> > > >> > > >> -- > > >> Sent from: > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > >> > > > > > |
Thanks, Thomas!
@Becket and @Nicholas - would you be ok with that approach? On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote: > Hi Stephan, > > Thanks for the feedback! > > I agree with the approach of starting with a simple implementation > that can address a well understood, significant portion of use cases. > > I'm planning to continue work on the prototype that I had shared. > There is production level usage waiting for it fairly soon. I expect > to open a PR in the coming weeks. > > Thomas > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote: > > > > Thanks all for this discussion. Looks like there are lots of ideas and > > folks that are eager to do things, so let's see how we can get this > moving. > > > > My take on this is the following: > > > > There will probably not be one Hybrid source, but possibly multiple ones, > > because of different strategies/requirements. > > - One may be very simple, with switching points known up-front. Would > > be good to have this in a very simple implementation. > > - There may be one where the switch is dynamic and the readers need > to > > report back where they left off. > > - There may be one that switches back and forth multiple times > during a > > job, for example Kakfa going to DFS whenever it falls behind retention, > in > > order to catch up again. > > > > This also seems hard to "design on paper"; I expect there are nuances in > a > > production setup that affect some details of the design. So I'd feel most > > comfortable in adding a variant of the hybrid source to Flink that has > been > > used already in a real use case (not necessarily in production, but maybe > > in a testing/staging environment, so it seems to meet all requirements). > > > > > > What do you think about the following approach? > > - If there is a tested PoC, let's try to get it contributed to Flink > > without trying to make it much more general. > > - When we see similar but a bit different requirements for another > hybrid > > source, then let's try to evolve the contributed one. > > - If we see new requirements that are so different that they don't fit > > well with the existing hybrid source, then let us look at building a > second > > hybrid source for those requirements. > > > > We need to make connector contributions in general more easy, and I think > > it is not a bad thing to end up with different approaches and see how > these > > play out against each other when being used by users. For example > switching > > with known boundaries, dynamic switching, back-and-forth-switching, etc. > > (I know some committers are planning to do some work on making > > connector contributions easier, with standardized testing frameworks, > > decoupled CI, etc.) > > > > Best, > > Stephan > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote: > > > > > Hi, > > > > > > As mentioned in my previous email, I had been working on a prototype > for > > > the hybrid source. > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > It contains: > > > * Switching with configurable chain of sources > > > * Fixed or dynamic start positions > > > * Test with MockSource and FileSource > > > > > > The purpose of the above PR is to gather feedback and help drive > consensus > > > on the FLIP. > > > > > > * How to support a dynamic start position within the source chain? > > > > > > Relevant in those (few?) cases where start positions are not known > upfront. > > > You can find an example of what that might look like in the tests: > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > When switching, the enumerator of the previous source needs to > > > supply information about consumed splits that allows to set the start > > > position for the next source. That could be something like the last > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > > > doesn't track finished splits.) > > > > > > See previous discussion regarding start/end position. The prototype > shows > > > the use of checkpoint state with converter function. > > > > > > * Should readers be deployed dynamically? > > > > > > The prototype assumes a static source chain that is fixed at job > submission > > > time. Conceivably there could be use cases that require more > flexibility. > > > Such as switching one KafkaSource for another. A step in that direction > > > would be to deploy the actual readers dynamically, at the time of > switching > > > source. > > > > > > Looking forward to feedback and suggestions for next steps! > > > > > > Thomas > > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> wrote: > > > > > > > Hi Nicholas, > > > > > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > > > configurable sequence of sources with predefined bounds. I'm using > the > > > > unmodified MockSource for illustration. It does not require a > > > "Switchable" > > > > interface. I looked at the code you shared and the delegation and > > > signaling > > > > works quite similar. That's a good validation. > > > > > > > > Hi Kezhu, > > > > > > > > Thanks for bringing the more detailed discussion regarding the > start/end > > > > position. I think in most cases the start and end positions will be > known > > > > when the job is submitted. If we take a File -> Kafka source chain as > > > > example, there would most likely be a timestamp at which we want to > > > > transition from files to reading from Kafka. So we would either set > the > > > > start position for Kafka based on that timestamp or provide the > offsets > > > > directly. (Note that I'm skipping a few related nuances here. In > order to > > > > achieve an exact switch without duplication or gap, we may also need > some > > > > overlap and filtering, but that's a separate issue.) > > > > > > > > The point is that the start positions can be configured by the user, > > > there > > > > is no need to transfer any information from one source to another as > part > > > > of switching. > > > > > > > > It gets more complicated if we want to achieve a dynamic switch > where the > > > > transition timestamp isn't known when the job starts. For example, > > > consider > > > > a bootstrap scenario where the time taken to process historic data > > > exceeds > > > > the Kafka retention. Here, we would need to dynamically resolve the > Kafka > > > > start position based on where the file readers left off, when the > > > switching > > > > occurs. The file source enumerator would determine at runtime when > it is > > > > done handing splits to its readers, maybe when the max file timestamp > > > > reaches (processing time - X). This information needs to be > transferred > > > to > > > > the Kafka source. > > > > > > > > The timestamp would need to be derived from the file enumerator > state, > > > > either by looking at the last splits or explicitly. The natural way > to do > > > > that is to introspect the enumerator state which gets checkpointed. > Any > > > > other form of "end position" via a special interface would need to be > > > > derived in the same manner. > > > > > > > > The converter that will be provided by the user would look at the > file > > > > enumerator state, derive the timestamp and then supply the "start > > > position" > > > > to the Kafka source. The Kafka source was created when the job > started. > > > It > > > > needs to be augmented with the new start position. That can be > achieved > > > via > > > > a special enumerator interface like > > > SwitchableSplitEnumerator#setStartState > > > > or by using restoreEnumerator with the checkpoint state constructed > by > > > the > > > > converter function. I'm leaning towards the latter as long as there > is a > > > > convenient way to construct the state from a position (like > > > > enumStateForTimestamp). The converter would map one enum state to > another > > > > and can be made very simple by providing a few utility functions > instead > > > of > > > > mandating a new interface that enumerators need to implement to > become > > > > switchable. > > > > > > > > Again, a converter is only required when sources need to be switched > > > based > > > > on positions not known at graph construction time. > > > > > > > > I'm planning to add such deferred switching to the PoC for > illustration > > > > and will share the experiment when that's done. > > > > > > > > Cheers, > > > > Thomas > > > > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email]> > > > wrote: > > > > > > > >> Hi Kezhu, > > > >> > > > >> Thanks for your detailed points for the Hybrid Source. I follow your > > > >> opinions and make a corresponding explanation as follows: > > > >> > > > >> 1.Would the Hybrid Source be possible to use this feature to > > > switch/chain > > > >> multiple homogeneous sources? > > > >> > > > >> "HybridSource" supports to switch/chain multiple homogeneous > sources, > > > >> which > > > >> have the respective implementation for "SwitchableSource" and > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether > the > > > >> Sources consisted is homogeneous. From the user's perspective, User > only > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the > smooth > > > >> migration operation to "HybridSource". > > > >> > > > >> 2."setStartState" is actually a reposition operation for next > source to > > > >> start in job runtime? > > > >> > > > >> IMO, "setStartState" is used to determine the initial position of > the > > > new > > > >> source for smooth migration, not reposition operation. More > importantly, > > > >> the > > > >> "State" mentioned here refers to the start and end positions of > reading > > > >> source. > > > >> > > > >> 3.This conversion should be implementation detail of next source, > not > > > >> converter function in my opinion? > > > >> > > > >> The state conversion is of course an implementation detail and > included > > > in > > > >> the switching mechanism, that should provide users with the > conversion > > > >> interface for conversion, which is defined in converter function. > What's > > > >> more, when users has already implemented "SwitchableSource" and > added to > > > >> the > > > >> Hybrid Source, the users don't need to implement the > "SwitchableSource" > > > >> for > > > >> the different conversion. From the user's perspective, users could > > > define > > > >> the different converter functions and create the "SwitchableSource" > for > > > >> the > > > >> addition of "HybridSource", no need to implement a Source for the > > > >> converter > > > >> function. > > > >> > > > >> 4.No configurable start-position. In this situation combination of > above > > > >> three joints is a nop, and > > > >> "HybridSource" is a chain of start-position pre-configured sources? > > > >> > > > >> Indeed there is no configurable start-position, and this > configuration > > > >> could > > > >> be involved in the feature. Users could use > > > >> "SwitchableSplitEnumerator#setStartState" interface or the > configuration > > > >> parameters to configure start-position. > > > >> > > > >> 5.I am wonder whether end-position is a must and how it could be > useful > > > >> for > > > >> end users in a generic-enough source? > > > >> > > > >> "getEndState" interface is used for the smooth migration scenario, > which > > > >> could return null value if it is not needed. In the Hybrid Source > > > >> mechanism, > > > >> this interface is required for the switching between the sources > > > >> consisted, > > > >> otherwise there is no any way to get end-position of upstream > source. In > > > >> summary, Hybrid Source needs to be able to set the start position > and > > > get > > > >> the end position of each Source, otherwise there is no use to build > > > Hybrid > > > >> Source. > > > >> > > > >> 6.Is it possible for converter function to do blocking operations? > How > > > to > > > >> respond to checkpoint request when switching split enumerators cross > > > >> sources? Does end-position or start-position need to be stored in > > > >> checkpoint > > > >> state or not? > > > >> > > > >> The converter function only simply converts the state of upstream > source > > > >> to > > > >> the state of downstream source, not blocking operations. The way to > > > >> respond > > > >> the checkpoint request when switching split enumerators cross > sources is > > > >> send the corresponding "SourceEvent" to coordination. The > end-position > > > or > > > >> start-position don't need to be stored in checkpoint state, only > > > >> implements > > > >> the "getEndState" interface for end-position. > > > >> > > > >> Best, > > > >> Nicholas Jiang > > > >> > > > >> > > > >> > > > >> -- > > > >> Sent from: > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > >> > > > > > > > > |
Sorry for the late reply. Starting from a specific connector sounds
reasonable to me. That said, I would suggest to keep the possibility of future generalization as much as possible. We have already seen some variation of source combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL Binlog, etc. So it would be good if we can reuse some base abstraction in the future instead of having to write each combination from scratch. Thanks, Jiangjie (Becket) Qin On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote: > Thanks, Thomas! > > @Becket and @Nicholas - would you be ok with that approach? > > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote: > > > Hi Stephan, > > > > Thanks for the feedback! > > > > I agree with the approach of starting with a simple implementation > > that can address a well understood, significant portion of use cases. > > > > I'm planning to continue work on the prototype that I had shared. > > There is production level usage waiting for it fairly soon. I expect > > to open a PR in the coming weeks. > > > > Thomas > > > > > > > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote: > > > > > > Thanks all for this discussion. Looks like there are lots of ideas and > > > folks that are eager to do things, so let's see how we can get this > > moving. > > > > > > My take on this is the following: > > > > > > There will probably not be one Hybrid source, but possibly multiple > ones, > > > because of different strategies/requirements. > > > - One may be very simple, with switching points known up-front. > Would > > > be good to have this in a very simple implementation. > > > - There may be one where the switch is dynamic and the readers need > > to > > > report back where they left off. > > > - There may be one that switches back and forth multiple times > > during a > > > job, for example Kakfa going to DFS whenever it falls behind retention, > > in > > > order to catch up again. > > > > > > This also seems hard to "design on paper"; I expect there are nuances > in > > a > > > production setup that affect some details of the design. So I'd feel > most > > > comfortable in adding a variant of the hybrid source to Flink that has > > been > > > used already in a real use case (not necessarily in production, but > maybe > > > in a testing/staging environment, so it seems to meet all > requirements). > > > > > > > > > What do you think about the following approach? > > > - If there is a tested PoC, let's try to get it contributed to Flink > > > without trying to make it much more general. > > > - When we see similar but a bit different requirements for another > > hybrid > > > source, then let's try to evolve the contributed one. > > > - If we see new requirements that are so different that they don't > fit > > > well with the existing hybrid source, then let us look at building a > > second > > > hybrid source for those requirements. > > > > > > We need to make connector contributions in general more easy, and I > think > > > it is not a bad thing to end up with different approaches and see how > > these > > > play out against each other when being used by users. For example > > switching > > > with known boundaries, dynamic switching, back-and-forth-switching, > etc. > > > (I know some committers are planning to do some work on making > > > connector contributions easier, with standardized testing frameworks, > > > decoupled CI, etc.) > > > > > > Best, > > > Stephan > > > > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote: > > > > > > > Hi, > > > > > > > > As mentioned in my previous email, I had been working on a prototype > > for > > > > the hybrid source. > > > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > > > It contains: > > > > * Switching with configurable chain of sources > > > > * Fixed or dynamic start positions > > > > * Test with MockSource and FileSource > > > > > > > > The purpose of the above PR is to gather feedback and help drive > > consensus > > > > on the FLIP. > > > > > > > > * How to support a dynamic start position within the source chain? > > > > > > > > Relevant in those (few?) cases where start positions are not known > > upfront. > > > > You can find an example of what that might look like in the tests: > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > > > When switching, the enumerator of the previous source needs to > > > > supply information about consumed splits that allows to set the start > > > > position for the next source. That could be something like the last > > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > > > > doesn't track finished splits.) > > > > > > > > See previous discussion regarding start/end position. The prototype > > shows > > > > the use of checkpoint state with converter function. > > > > > > > > * Should readers be deployed dynamically? > > > > > > > > The prototype assumes a static source chain that is fixed at job > > submission > > > > time. Conceivably there could be use cases that require more > > flexibility. > > > > Such as switching one KafkaSource for another. A step in that > direction > > > > would be to deploy the actual readers dynamically, at the time of > > switching > > > > source. > > > > > > > > Looking forward to feedback and suggestions for next steps! > > > > > > > > Thomas > > > > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> > wrote: > > > > > > > > > Hi Nicholas, > > > > > > > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > > > > configurable sequence of sources with predefined bounds. I'm using > > the > > > > > unmodified MockSource for illustration. It does not require a > > > > "Switchable" > > > > > interface. I looked at the code you shared and the delegation and > > > > signaling > > > > > works quite similar. That's a good validation. > > > > > > > > > > Hi Kezhu, > > > > > > > > > > Thanks for bringing the more detailed discussion regarding the > > start/end > > > > > position. I think in most cases the start and end positions will be > > known > > > > > when the job is submitted. If we take a File -> Kafka source chain > as > > > > > example, there would most likely be a timestamp at which we want to > > > > > transition from files to reading from Kafka. So we would either set > > the > > > > > start position for Kafka based on that timestamp or provide the > > offsets > > > > > directly. (Note that I'm skipping a few related nuances here. In > > order to > > > > > achieve an exact switch without duplication or gap, we may also > need > > some > > > > > overlap and filtering, but that's a separate issue.) > > > > > > > > > > The point is that the start positions can be configured by the > user, > > > > there > > > > > is no need to transfer any information from one source to another > as > > part > > > > > of switching. > > > > > > > > > > It gets more complicated if we want to achieve a dynamic switch > > where the > > > > > transition timestamp isn't known when the job starts. For example, > > > > consider > > > > > a bootstrap scenario where the time taken to process historic data > > > > exceeds > > > > > the Kafka retention. Here, we would need to dynamically resolve the > > Kafka > > > > > start position based on where the file readers left off, when the > > > > switching > > > > > occurs. The file source enumerator would determine at runtime when > > it is > > > > > done handing splits to its readers, maybe when the max file > timestamp > > > > > reaches (processing time - X). This information needs to be > > transferred > > > > to > > > > > the Kafka source. > > > > > > > > > > The timestamp would need to be derived from the file enumerator > > state, > > > > > either by looking at the last splits or explicitly. The natural way > > to do > > > > > that is to introspect the enumerator state which gets checkpointed. > > Any > > > > > other form of "end position" via a special interface would need to > be > > > > > derived in the same manner. > > > > > > > > > > The converter that will be provided by the user would look at the > > file > > > > > enumerator state, derive the timestamp and then supply the "start > > > > position" > > > > > to the Kafka source. The Kafka source was created when the job > > started. > > > > It > > > > > needs to be augmented with the new start position. That can be > > achieved > > > > via > > > > > a special enumerator interface like > > > > SwitchableSplitEnumerator#setStartState > > > > > or by using restoreEnumerator with the checkpoint state constructed > > by > > > > the > > > > > converter function. I'm leaning towards the latter as long as there > > is a > > > > > convenient way to construct the state from a position (like > > > > > enumStateForTimestamp). The converter would map one enum state to > > another > > > > > and can be made very simple by providing a few utility functions > > instead > > > > of > > > > > mandating a new interface that enumerators need to implement to > > become > > > > > switchable. > > > > > > > > > > Again, a converter is only required when sources need to be > switched > > > > based > > > > > on positions not known at graph construction time. > > > > > > > > > > I'm planning to add such deferred switching to the PoC for > > illustration > > > > > and will share the experiment when that's done. > > > > > > > > > > Cheers, > > > > > Thomas > > > > > > > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email] > > > > > > wrote: > > > > > > > > > >> Hi Kezhu, > > > > >> > > > > >> Thanks for your detailed points for the Hybrid Source. I follow > your > > > > >> opinions and make a corresponding explanation as follows: > > > > >> > > > > >> 1.Would the Hybrid Source be possible to use this feature to > > > > switch/chain > > > > >> multiple homogeneous sources? > > > > >> > > > > >> "HybridSource" supports to switch/chain multiple homogeneous > > sources, > > > > >> which > > > > >> have the respective implementation for "SwitchableSource" and > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether > > the > > > > >> Sources consisted is homogeneous. From the user's perspective, > User > > only > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the > > smooth > > > > >> migration operation to "HybridSource". > > > > >> > > > > >> 2."setStartState" is actually a reposition operation for next > > source to > > > > >> start in job runtime? > > > > >> > > > > >> IMO, "setStartState" is used to determine the initial position of > > the > > > > new > > > > >> source for smooth migration, not reposition operation. More > > importantly, > > > > >> the > > > > >> "State" mentioned here refers to the start and end positions of > > reading > > > > >> source. > > > > >> > > > > >> 3.This conversion should be implementation detail of next source, > > not > > > > >> converter function in my opinion? > > > > >> > > > > >> The state conversion is of course an implementation detail and > > included > > > > in > > > > >> the switching mechanism, that should provide users with the > > conversion > > > > >> interface for conversion, which is defined in converter function. > > What's > > > > >> more, when users has already implemented "SwitchableSource" and > > added to > > > > >> the > > > > >> Hybrid Source, the users don't need to implement the > > "SwitchableSource" > > > > >> for > > > > >> the different conversion. From the user's perspective, users could > > > > define > > > > >> the different converter functions and create the > "SwitchableSource" > > for > > > > >> the > > > > >> addition of "HybridSource", no need to implement a Source for the > > > > >> converter > > > > >> function. > > > > >> > > > > >> 4.No configurable start-position. In this situation combination of > > above > > > > >> three joints is a nop, and > > > > >> "HybridSource" is a chain of start-position pre-configured > sources? > > > > >> > > > > >> Indeed there is no configurable start-position, and this > > configuration > > > > >> could > > > > >> be involved in the feature. Users could use > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the > > configuration > > > > >> parameters to configure start-position. > > > > >> > > > > >> 5.I am wonder whether end-position is a must and how it could be > > useful > > > > >> for > > > > >> end users in a generic-enough source? > > > > >> > > > > >> "getEndState" interface is used for the smooth migration scenario, > > which > > > > >> could return null value if it is not needed. In the Hybrid Source > > > > >> mechanism, > > > > >> this interface is required for the switching between the sources > > > > >> consisted, > > > > >> otherwise there is no any way to get end-position of upstream > > source. In > > > > >> summary, Hybrid Source needs to be able to set the start position > > and > > > > get > > > > >> the end position of each Source, otherwise there is no use to > build > > > > Hybrid > > > > >> Source. > > > > >> > > > > >> 6.Is it possible for converter function to do blocking operations? > > How > > > > to > > > > >> respond to checkpoint request when switching split enumerators > cross > > > > >> sources? Does end-position or start-position need to be stored in > > > > >> checkpoint > > > > >> state or not? > > > > >> > > > > >> The converter function only simply converts the state of upstream > > source > > > > >> to > > > > >> the state of downstream source, not blocking operations. The way > to > > > > >> respond > > > > >> the checkpoint request when switching split enumerators cross > > sources is > > > > >> send the corresponding "SourceEvent" to coordination. The > > end-position > > > > or > > > > >> start-position don't need to be stored in checkpoint state, only > > > > >> implements > > > > >> the "getEndState" interface for end-position. > > > > >> > > > > >> Best, > > > > >> Nicholas Jiang > > > > >> > > > > >> > > > > >> > > > > >> -- > > > > >> Sent from: > > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > >> > > > > > > > > > > > > |
Hi Becket,
I agree and am not planning to hard wire a specific combination of sources (like S3 + Kafka). That also wouldn't help for the use case I want to address, because there are customized connectors that we need to be able to plug in. Rather, the suggested simplification would be for the flexibility of switching mechanism. The prototype already supports fixed start positions and checkpoint conversion for any combination of sources; no need to undo that. But for testing/example purposes, we will need to settle on a specific combination. Thomas On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <[hidden email]> wrote: > > Sorry for the late reply. Starting from a specific connector sounds > reasonable to me. > > That said, I would suggest to keep the possibility of future generalization > as much as possible. We have already seen some variation of source > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL > Binlog, etc. So it would be good if we can reuse some base abstraction in > the future instead of having to write each combination from scratch. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote: > > > Thanks, Thomas! > > > > @Becket and @Nicholas - would you be ok with that approach? > > > > > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote: > > > > > Hi Stephan, > > > > > > Thanks for the feedback! > > > > > > I agree with the approach of starting with a simple implementation > > > that can address a well understood, significant portion of use cases. > > > > > > I'm planning to continue work on the prototype that I had shared. > > > There is production level usage waiting for it fairly soon. I expect > > > to open a PR in the coming weeks. > > > > > > Thomas > > > > > > > > > > > > > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> wrote: > > > > > > > > Thanks all for this discussion. Looks like there are lots of ideas and > > > > folks that are eager to do things, so let's see how we can get this > > > moving. > > > > > > > > My take on this is the following: > > > > > > > > There will probably not be one Hybrid source, but possibly multiple > > ones, > > > > because of different strategies/requirements. > > > > - One may be very simple, with switching points known up-front. > > Would > > > > be good to have this in a very simple implementation. > > > > - There may be one where the switch is dynamic and the readers need > > > to > > > > report back where they left off. > > > > - There may be one that switches back and forth multiple times > > > during a > > > > job, for example Kakfa going to DFS whenever it falls behind retention, > > > in > > > > order to catch up again. > > > > > > > > This also seems hard to "design on paper"; I expect there are nuances > > in > > > a > > > > production setup that affect some details of the design. So I'd feel > > most > > > > comfortable in adding a variant of the hybrid source to Flink that has > > > been > > > > used already in a real use case (not necessarily in production, but > > maybe > > > > in a testing/staging environment, so it seems to meet all > > requirements). > > > > > > > > > > > > What do you think about the following approach? > > > > - If there is a tested PoC, let's try to get it contributed to Flink > > > > without trying to make it much more general. > > > > - When we see similar but a bit different requirements for another > > > hybrid > > > > source, then let's try to evolve the contributed one. > > > > - If we see new requirements that are so different that they don't > > fit > > > > well with the existing hybrid source, then let us look at building a > > > second > > > > hybrid source for those requirements. > > > > > > > > We need to make connector contributions in general more easy, and I > > think > > > > it is not a bad thing to end up with different approaches and see how > > > these > > > > play out against each other when being used by users. For example > > > switching > > > > with known boundaries, dynamic switching, back-and-forth-switching, > > etc. > > > > (I know some committers are planning to do some work on making > > > > connector contributions easier, with standardized testing frameworks, > > > > decoupled CI, etc.) > > > > > > > > Best, > > > > Stephan > > > > > > > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> wrote: > > > > > > > > > Hi, > > > > > > > > > > As mentioned in my previous email, I had been working on a prototype > > > for > > > > > the hybrid source. > > > > > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > > > > > It contains: > > > > > * Switching with configurable chain of sources > > > > > * Fixed or dynamic start positions > > > > > * Test with MockSource and FileSource > > > > > > > > > > The purpose of the above PR is to gather feedback and help drive > > > consensus > > > > > on the FLIP. > > > > > > > > > > * How to support a dynamic start position within the source chain? > > > > > > > > > > Relevant in those (few?) cases where start positions are not known > > > upfront. > > > > > You can find an example of what that might look like in the tests: > > > > > > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > > > > > When switching, the enumerator of the previous source needs to > > > > > supply information about consumed splits that allows to set the start > > > > > position for the next source. That could be something like the last > > > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > > > > > doesn't track finished splits.) > > > > > > > > > > See previous discussion regarding start/end position. The prototype > > > shows > > > > > the use of checkpoint state with converter function. > > > > > > > > > > * Should readers be deployed dynamically? > > > > > > > > > > The prototype assumes a static source chain that is fixed at job > > > submission > > > > > time. Conceivably there could be use cases that require more > > > flexibility. > > > > > Such as switching one KafkaSource for another. A step in that > > direction > > > > > would be to deploy the actual readers dynamically, at the time of > > > switching > > > > > source. > > > > > > > > > > Looking forward to feedback and suggestions for next steps! > > > > > > > > > > Thomas > > > > > > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> > > wrote: > > > > > > > > > > > Hi Nicholas, > > > > > > > > > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > > > > > configurable sequence of sources with predefined bounds. I'm using > > > the > > > > > > unmodified MockSource for illustration. It does not require a > > > > > "Switchable" > > > > > > interface. I looked at the code you shared and the delegation and > > > > > signaling > > > > > > works quite similar. That's a good validation. > > > > > > > > > > > > Hi Kezhu, > > > > > > > > > > > > Thanks for bringing the more detailed discussion regarding the > > > start/end > > > > > > position. I think in most cases the start and end positions will be > > > known > > > > > > when the job is submitted. If we take a File -> Kafka source chain > > as > > > > > > example, there would most likely be a timestamp at which we want to > > > > > > transition from files to reading from Kafka. So we would either set > > > the > > > > > > start position for Kafka based on that timestamp or provide the > > > offsets > > > > > > directly. (Note that I'm skipping a few related nuances here. In > > > order to > > > > > > achieve an exact switch without duplication or gap, we may also > > need > > > some > > > > > > overlap and filtering, but that's a separate issue.) > > > > > > > > > > > > The point is that the start positions can be configured by the > > user, > > > > > there > > > > > > is no need to transfer any information from one source to another > > as > > > part > > > > > > of switching. > > > > > > > > > > > > It gets more complicated if we want to achieve a dynamic switch > > > where the > > > > > > transition timestamp isn't known when the job starts. For example, > > > > > consider > > > > > > a bootstrap scenario where the time taken to process historic data > > > > > exceeds > > > > > > the Kafka retention. Here, we would need to dynamically resolve the > > > Kafka > > > > > > start position based on where the file readers left off, when the > > > > > switching > > > > > > occurs. The file source enumerator would determine at runtime when > > > it is > > > > > > done handing splits to its readers, maybe when the max file > > timestamp > > > > > > reaches (processing time - X). This information needs to be > > > transferred > > > > > to > > > > > > the Kafka source. > > > > > > > > > > > > The timestamp would need to be derived from the file enumerator > > > state, > > > > > > either by looking at the last splits or explicitly. The natural way > > > to do > > > > > > that is to introspect the enumerator state which gets checkpointed. > > > Any > > > > > > other form of "end position" via a special interface would need to > > be > > > > > > derived in the same manner. > > > > > > > > > > > > The converter that will be provided by the user would look at the > > > file > > > > > > enumerator state, derive the timestamp and then supply the "start > > > > > position" > > > > > > to the Kafka source. The Kafka source was created when the job > > > started. > > > > > It > > > > > > needs to be augmented with the new start position. That can be > > > achieved > > > > > via > > > > > > a special enumerator interface like > > > > > SwitchableSplitEnumerator#setStartState > > > > > > or by using restoreEnumerator with the checkpoint state constructed > > > by > > > > > the > > > > > > converter function. I'm leaning towards the latter as long as there > > > is a > > > > > > convenient way to construct the state from a position (like > > > > > > enumStateForTimestamp). The converter would map one enum state to > > > another > > > > > > and can be made very simple by providing a few utility functions > > > instead > > > > > of > > > > > > mandating a new interface that enumerators need to implement to > > > become > > > > > > switchable. > > > > > > > > > > > > Again, a converter is only required when sources need to be > > switched > > > > > based > > > > > > on positions not known at graph construction time. > > > > > > > > > > > > I'm planning to add such deferred switching to the PoC for > > > illustration > > > > > > and will share the experiment when that's done. > > > > > > > > > > > > Cheers, > > > > > > Thomas > > > > > > > > > > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <[hidden email] > > > > > > > > wrote: > > > > > > > > > > > >> Hi Kezhu, > > > > > >> > > > > > >> Thanks for your detailed points for the Hybrid Source. I follow > > your > > > > > >> opinions and make a corresponding explanation as follows: > > > > > >> > > > > > >> 1.Would the Hybrid Source be possible to use this feature to > > > > > switch/chain > > > > > >> multiple homogeneous sources? > > > > > >> > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous > > > sources, > > > > > >> which > > > > > >> have the respective implementation for "SwitchableSource" and > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether > > > the > > > > > >> Sources consisted is homogeneous. From the user's perspective, > > User > > > only > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the > > > smooth > > > > > >> migration operation to "HybridSource". > > > > > >> > > > > > >> 2."setStartState" is actually a reposition operation for next > > > source to > > > > > >> start in job runtime? > > > > > >> > > > > > >> IMO, "setStartState" is used to determine the initial position of > > > the > > > > > new > > > > > >> source for smooth migration, not reposition operation. More > > > importantly, > > > > > >> the > > > > > >> "State" mentioned here refers to the start and end positions of > > > reading > > > > > >> source. > > > > > >> > > > > > >> 3.This conversion should be implementation detail of next source, > > > not > > > > > >> converter function in my opinion? > > > > > >> > > > > > >> The state conversion is of course an implementation detail and > > > included > > > > > in > > > > > >> the switching mechanism, that should provide users with the > > > conversion > > > > > >> interface for conversion, which is defined in converter function. > > > What's > > > > > >> more, when users has already implemented "SwitchableSource" and > > > added to > > > > > >> the > > > > > >> Hybrid Source, the users don't need to implement the > > > "SwitchableSource" > > > > > >> for > > > > > >> the different conversion. From the user's perspective, users could > > > > > define > > > > > >> the different converter functions and create the > > "SwitchableSource" > > > for > > > > > >> the > > > > > >> addition of "HybridSource", no need to implement a Source for the > > > > > >> converter > > > > > >> function. > > > > > >> > > > > > >> 4.No configurable start-position. In this situation combination of > > > above > > > > > >> three joints is a nop, and > > > > > >> "HybridSource" is a chain of start-position pre-configured > > sources? > > > > > >> > > > > > >> Indeed there is no configurable start-position, and this > > > configuration > > > > > >> could > > > > > >> be involved in the feature. Users could use > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the > > > configuration > > > > > >> parameters to configure start-position. > > > > > >> > > > > > >> 5.I am wonder whether end-position is a must and how it could be > > > useful > > > > > >> for > > > > > >> end users in a generic-enough source? > > > > > >> > > > > > >> "getEndState" interface is used for the smooth migration scenario, > > > which > > > > > >> could return null value if it is not needed. In the Hybrid Source > > > > > >> mechanism, > > > > > >> this interface is required for the switching between the sources > > > > > >> consisted, > > > > > >> otherwise there is no any way to get end-position of upstream > > > source. In > > > > > >> summary, Hybrid Source needs to be able to set the start position > > > and > > > > > get > > > > > >> the end position of each Source, otherwise there is no use to > > build > > > > > Hybrid > > > > > >> Source. > > > > > >> > > > > > >> 6.Is it possible for converter function to do blocking operations? > > > How > > > > > to > > > > > >> respond to checkpoint request when switching split enumerators > > cross > > > > > >> sources? Does end-position or start-position need to be stored in > > > > > >> checkpoint > > > > > >> state or not? > > > > > >> > > > > > >> The converter function only simply converts the state of upstream > > > source > > > > > >> to > > > > > >> the state of downstream source, not blocking operations. The way > > to > > > > > >> respond > > > > > >> the checkpoint request when switching split enumerators cross > > > sources is > > > > > >> send the corresponding "SourceEvent" to coordination. The > > > end-position > > > > > or > > > > > >> start-position don't need to be stored in checkpoint state, only > > > > > >> implements > > > > > >> the "getEndState" interface for end-position. > > > > > >> > > > > > >> Best, > > > > > >> Nicholas Jiang > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Sent from: > > > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > >> > > > > > > > > > > > > > > > > |
Thanks for the clarification, Thomas. Yes, that makes sense to me.
Cheers, Jiangjie (Becket) Qin On Mon, Apr 26, 2021 at 1:03 AM Thomas Weise <[hidden email]> wrote: > Hi Becket, > > I agree and am not planning to hard wire a specific combination of > sources (like S3 + Kafka). That also wouldn't help for the use case I > want to address, because there are customized connectors that we need > to be able to plug in. > > Rather, the suggested simplification would be for the flexibility of > switching mechanism. > > The prototype already supports fixed start positions and checkpoint > conversion for any combination of sources; no need to undo that. > > But for testing/example purposes, we will need to settle on a specific > combination. > > Thomas > > On Sat, Apr 24, 2021 at 8:20 PM Becket Qin <[hidden email]> wrote: > > > > Sorry for the late reply. Starting from a specific connector sounds > > reasonable to me. > > > > That said, I would suggest to keep the possibility of future > generalization > > as much as possible. We have already seen some variation of source > > combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL > > Binlog, etc. So it would be good if we can reuse some base abstraction in > > the future instead of having to write each combination from scratch. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <[hidden email]> wrote: > > > > > Thanks, Thomas! > > > > > > @Becket and @Nicholas - would you be ok with that approach? > > > > > > > > > On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <[hidden email]> wrote: > > > > > > > Hi Stephan, > > > > > > > > Thanks for the feedback! > > > > > > > > I agree with the approach of starting with a simple implementation > > > > that can address a well understood, significant portion of use cases. > > > > > > > > I'm planning to continue work on the prototype that I had shared. > > > > There is production level usage waiting for it fairly soon. I expect > > > > to open a PR in the coming weeks. > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <[hidden email]> > wrote: > > > > > > > > > > Thanks all for this discussion. Looks like there are lots of ideas > and > > > > > folks that are eager to do things, so let's see how we can get this > > > > moving. > > > > > > > > > > My take on this is the following: > > > > > > > > > > There will probably not be one Hybrid source, but possibly multiple > > > ones, > > > > > because of different strategies/requirements. > > > > > - One may be very simple, with switching points known up-front. > > > Would > > > > > be good to have this in a very simple implementation. > > > > > - There may be one where the switch is dynamic and the readers > need > > > > to > > > > > report back where they left off. > > > > > - There may be one that switches back and forth multiple times > > > > during a > > > > > job, for example Kakfa going to DFS whenever it falls behind > retention, > > > > in > > > > > order to catch up again. > > > > > > > > > > This also seems hard to "design on paper"; I expect there are > nuances > > > in > > > > a > > > > > production setup that affect some details of the design. So I'd > feel > > > most > > > > > comfortable in adding a variant of the hybrid source to Flink that > has > > > > been > > > > > used already in a real use case (not necessarily in production, but > > > maybe > > > > > in a testing/staging environment, so it seems to meet all > > > requirements). > > > > > > > > > > > > > > > What do you think about the following approach? > > > > > - If there is a tested PoC, let's try to get it contributed to > Flink > > > > > without trying to make it much more general. > > > > > - When we see similar but a bit different requirements for > another > > > > hybrid > > > > > source, then let's try to evolve the contributed one. > > > > > - If we see new requirements that are so different that they > don't > > > fit > > > > > well with the existing hybrid source, then let us look at building > a > > > > second > > > > > hybrid source for those requirements. > > > > > > > > > > We need to make connector contributions in general more easy, and I > > > think > > > > > it is not a bad thing to end up with different approaches and see > how > > > > these > > > > > play out against each other when being used by users. For example > > > > switching > > > > > with known boundaries, dynamic switching, back-and-forth-switching, > > > etc. > > > > > (I know some committers are planning to do some work on making > > > > > connector contributions easier, with standardized testing > frameworks, > > > > > decoupled CI, etc.) > > > > > > > > > > Best, > > > > > Stephan > > > > > > > > > > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <[hidden email]> > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > As mentioned in my previous email, I had been working on a > prototype > > > > for > > > > > > the hybrid source. > > > > > > > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > > > > > > > It contains: > > > > > > * Switching with configurable chain of sources > > > > > > * Fixed or dynamic start positions > > > > > > * Test with MockSource and FileSource > > > > > > > > > > > > The purpose of the above PR is to gather feedback and help drive > > > > consensus > > > > > > on the FLIP. > > > > > > > > > > > > * How to support a dynamic start position within the source > chain? > > > > > > > > > > > > Relevant in those (few?) cases where start positions are not > known > > > > upfront. > > > > > > You can find an example of what that might look like in the > tests: > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > > > > > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > > > > > > > When switching, the enumerator of the previous source needs to > > > > > > supply information about consumed splits that allows to set the > start > > > > > > position for the next source. That could be something like the > last > > > > > > processed file, timestamp, etc. (Currently > StaticFileSplitEnumerator > > > > > > doesn't track finished splits.) > > > > > > > > > > > > See previous discussion regarding start/end position. The > prototype > > > > shows > > > > > > the use of checkpoint state with converter function. > > > > > > > > > > > > * Should readers be deployed dynamically? > > > > > > > > > > > > The prototype assumes a static source chain that is fixed at job > > > > submission > > > > > > time. Conceivably there could be use cases that require more > > > > flexibility. > > > > > > Such as switching one KafkaSource for another. A step in that > > > direction > > > > > > would be to deploy the actual readers dynamically, at the time of > > > > switching > > > > > > source. > > > > > > > > > > > > Looking forward to feedback and suggestions for next steps! > > > > > > > > > > > > Thomas > > > > > > > > > > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <[hidden email]> > > > wrote: > > > > > > > > > > > > > Hi Nicholas, > > > > > > > > > > > > > > Thanks for the reply. I had implemented a small PoC. It > switches a > > > > > > > configurable sequence of sources with predefined bounds. I'm > using > > > > the > > > > > > > unmodified MockSource for illustration. It does not require a > > > > > > "Switchable" > > > > > > > interface. I looked at the code you shared and the delegation > and > > > > > > signaling > > > > > > > works quite similar. That's a good validation. > > > > > > > > > > > > > > Hi Kezhu, > > > > > > > > > > > > > > Thanks for bringing the more detailed discussion regarding the > > > > start/end > > > > > > > position. I think in most cases the start and end positions > will be > > > > known > > > > > > > when the job is submitted. If we take a File -> Kafka source > chain > > > as > > > > > > > example, there would most likely be a timestamp at which we > want to > > > > > > > transition from files to reading from Kafka. So we would > either set > > > > the > > > > > > > start position for Kafka based on that timestamp or provide the > > > > offsets > > > > > > > directly. (Note that I'm skipping a few related nuances here. > In > > > > order to > > > > > > > achieve an exact switch without duplication or gap, we may also > > > need > > > > some > > > > > > > overlap and filtering, but that's a separate issue.) > > > > > > > > > > > > > > The point is that the start positions can be configured by the > > > user, > > > > > > there > > > > > > > is no need to transfer any information from one source to > another > > > as > > > > part > > > > > > > of switching. > > > > > > > > > > > > > > It gets more complicated if we want to achieve a dynamic switch > > > > where the > > > > > > > transition timestamp isn't known when the job starts. For > example, > > > > > > consider > > > > > > > a bootstrap scenario where the time taken to process historic > data > > > > > > exceeds > > > > > > > the Kafka retention. Here, we would need to dynamically > resolve the > > > > Kafka > > > > > > > start position based on where the file readers left off, when > the > > > > > > switching > > > > > > > occurs. The file source enumerator would determine at runtime > when > > > > it is > > > > > > > done handing splits to its readers, maybe when the max file > > > timestamp > > > > > > > reaches (processing time - X). This information needs to be > > > > transferred > > > > > > to > > > > > > > the Kafka source. > > > > > > > > > > > > > > The timestamp would need to be derived from the file enumerator > > > > state, > > > > > > > either by looking at the last splits or explicitly. The > natural way > > > > to do > > > > > > > that is to introspect the enumerator state which gets > checkpointed. > > > > Any > > > > > > > other form of "end position" via a special interface would > need to > > > be > > > > > > > derived in the same manner. > > > > > > > > > > > > > > The converter that will be provided by the user would look at > the > > > > file > > > > > > > enumerator state, derive the timestamp and then supply the > "start > > > > > > position" > > > > > > > to the Kafka source. The Kafka source was created when the job > > > > started. > > > > > > It > > > > > > > needs to be augmented with the new start position. That can be > > > > achieved > > > > > > via > > > > > > > a special enumerator interface like > > > > > > SwitchableSplitEnumerator#setStartState > > > > > > > or by using restoreEnumerator with the checkpoint state > constructed > > > > by > > > > > > the > > > > > > > converter function. I'm leaning towards the latter as long as > there > > > > is a > > > > > > > convenient way to construct the state from a position (like > > > > > > > enumStateForTimestamp). The converter would map one enum state > to > > > > another > > > > > > > and can be made very simple by providing a few utility > functions > > > > instead > > > > > > of > > > > > > > mandating a new interface that enumerators need to implement to > > > > become > > > > > > > switchable. > > > > > > > > > > > > > > Again, a converter is only required when sources need to be > > > switched > > > > > > based > > > > > > > on positions not known at graph construction time. > > > > > > > > > > > > > > I'm planning to add such deferred switching to the PoC for > > > > illustration > > > > > > > and will share the experiment when that's done. > > > > > > > > > > > > > > Cheers, > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang < > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Hi Kezhu, > > > > > > >> > > > > > > >> Thanks for your detailed points for the Hybrid Source. I > follow > > > your > > > > > > >> opinions and make a corresponding explanation as follows: > > > > > > >> > > > > > > >> 1.Would the Hybrid Source be possible to use this feature to > > > > > > switch/chain > > > > > > >> multiple homogeneous sources? > > > > > > >> > > > > > > >> "HybridSource" supports to switch/chain multiple homogeneous > > > > sources, > > > > > > >> which > > > > > > >> have the respective implementation for "SwitchableSource" and > > > > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit > whether > > > > the > > > > > > >> Sources consisted is homogeneous. From the user's perspective, > > > User > > > > only > > > > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the > > > > smooth > > > > > > >> migration operation to "HybridSource". > > > > > > >> > > > > > > >> 2."setStartState" is actually a reposition operation for next > > > > source to > > > > > > >> start in job runtime? > > > > > > >> > > > > > > >> IMO, "setStartState" is used to determine the initial > position of > > > > the > > > > > > new > > > > > > >> source for smooth migration, not reposition operation. More > > > > importantly, > > > > > > >> the > > > > > > >> "State" mentioned here refers to the start and end positions > of > > > > reading > > > > > > >> source. > > > > > > >> > > > > > > >> 3.This conversion should be implementation detail of next > source, > > > > not > > > > > > >> converter function in my opinion? > > > > > > >> > > > > > > >> The state conversion is of course an implementation detail and > > > > included > > > > > > in > > > > > > >> the switching mechanism, that should provide users with the > > > > conversion > > > > > > >> interface for conversion, which is defined in converter > function. > > > > What's > > > > > > >> more, when users has already implemented "SwitchableSource" > and > > > > added to > > > > > > >> the > > > > > > >> Hybrid Source, the users don't need to implement the > > > > "SwitchableSource" > > > > > > >> for > > > > > > >> the different conversion. From the user's perspective, users > could > > > > > > define > > > > > > >> the different converter functions and create the > > > "SwitchableSource" > > > > for > > > > > > >> the > > > > > > >> addition of "HybridSource", no need to implement a Source for > the > > > > > > >> converter > > > > > > >> function. > > > > > > >> > > > > > > >> 4.No configurable start-position. In this situation > combination of > > > > above > > > > > > >> three joints is a nop, and > > > > > > >> "HybridSource" is a chain of start-position pre-configured > > > sources? > > > > > > >> > > > > > > >> Indeed there is no configurable start-position, and this > > > > configuration > > > > > > >> could > > > > > > >> be involved in the feature. Users could use > > > > > > >> "SwitchableSplitEnumerator#setStartState" interface or the > > > > configuration > > > > > > >> parameters to configure start-position. > > > > > > >> > > > > > > >> 5.I am wonder whether end-position is a must and how it could > be > > > > useful > > > > > > >> for > > > > > > >> end users in a generic-enough source? > > > > > > >> > > > > > > >> "getEndState" interface is used for the smooth migration > scenario, > > > > which > > > > > > >> could return null value if it is not needed. In the Hybrid > Source > > > > > > >> mechanism, > > > > > > >> this interface is required for the switching between the > sources > > > > > > >> consisted, > > > > > > >> otherwise there is no any way to get end-position of upstream > > > > source. In > > > > > > >> summary, Hybrid Source needs to be able to set the start > position > > > > and > > > > > > get > > > > > > >> the end position of each Source, otherwise there is no use to > > > build > > > > > > Hybrid > > > > > > >> Source. > > > > > > >> > > > > > > >> 6.Is it possible for converter function to do blocking > operations? > > > > How > > > > > > to > > > > > > >> respond to checkpoint request when switching split enumerators > > > cross > > > > > > >> sources? Does end-position or start-position need to be > stored in > > > > > > >> checkpoint > > > > > > >> state or not? > > > > > > >> > > > > > > >> The converter function only simply converts the state of > upstream > > > > source > > > > > > >> to > > > > > > >> the state of downstream source, not blocking operations. The > way > > > to > > > > > > >> respond > > > > > > >> the checkpoint request when switching split enumerators cross > > > > sources is > > > > > > >> send the corresponding "SourceEvent" to coordination. The > > > > end-position > > > > > > or > > > > > > >> start-position don't need to be stored in checkpoint state, > only > > > > > > >> implements > > > > > > >> the "getEndState" interface for end-position. > > > > > > >> > > > > > > >> Best, > > > > > > >> Nicholas Jiang > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> Sent from: > > > > > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > > >> > > > > > > > > > > > > > > > > > > > > > |
In reply to this post by Stephan Ewen
Hi Thomas,
Sorry for later reply for your POC. I have reviewed the based abstract implementation of your pull request: https://github.com/apache/flink/pull/15924. IMO, for the switching mechanism, this level of abstraction is not concise enough, which doesn't make connector contribution easier. In theory, it is necessary to introduce a set of interfaces to support the switching mechanism. The SwitchableSource and SwitchableSplitEnumerator interfaces are needed for connector expansibility. In other words, the whole switching process of above mentioned PR is different from that mentioned in FLIP-150. In the above implementation, the source reading switching is executed after receving the SwitchSourceEvent, which could be before the sending SourceReaderFinishEvent. This timeline of source reading switching could be discussed here. @Stephan @Becket, if you are available, please help to review the abstract implementation, and compare with the interfaces mentioned in FLIP-150. Thanks, Nicholas Jiang -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi Nicholas,
Thanks for taking a look at the PR! 1. Regarding switching mechanism: There has been previous discussion in this thread regarding the pros and cons of how the switching can be exposed to the user. With fixed start positions, no special switching interface to transfer information between enumerators is required. Sources are configured as they would be when used standalone and just plugged into HybridSource. I expect that to be a common use case. You can find an example for this in the ITCase: https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 For dynamic start position, the checkpoint state is used to transfer information from old to new enumerator. An example for that can be found here: https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 That may look verbose, but the code to convert from one state to another can be factored out into a utility and the function becomes a one-liner. For common sources like files and Kafka we can potentially (later) implement the conversion logic as part of the respective connector's checkpoint and split classes. I hope that with the PR up for review, we can soon reach a conclusion on how we want to expose this to the user. Following is an example for Files -> Files -> Kafka that I'm using for e2e testing. It exercises both ways of setting the start position. https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a 2. Regarding the events used to implement the actual switch between enumerator and readers: I updated the PR with javadoc to clarify the intent. Please let me know if that helps or let's continue to discuss those details on the PR? Thanks, Thomas On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> wrote: > > Hi Thomas, > > Sorry for later reply for your POC. I have reviewed the based abstract > implementation of your pull request: > https://github.com/apache/flink/pull/15924. IMO, for the switching > mechanism, this level of abstraction is not concise enough, which doesn't > make connector contribution easier. In theory, it is necessary to introduce > a set of interfaces to support the switching mechanism. The SwitchableSource > and SwitchableSplitEnumerator interfaces are needed for connector > expansibility. > In other words, the whole switching process of above mentioned PR is > different from that mentioned in FLIP-150. In the above implementation, the > source reading switching is executed after receving the SwitchSourceEvent, > which could be before the sending SourceReaderFinishEvent. This timeline of > source reading switching could be discussed here. > @Stephan @Becket, if you are available, please help to review the > abstract implementation, and compare with the interfaces mentioned in > FLIP-150. > > Thanks, > Nicholas Jiang > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
discussed the PR with Thosmas offline. Thomas, please correct me if I
missed anything. Right now, the PR differs from the FLIP-150 doc regarding the converter. * Current PR uses the enumerator checkpoint state type as the input for the converter * FLIP-150 defines a new EndStateT interface. It seems that the FLIP-150 approach of EndStateT is more flexible, as transition EndStateT doesn't have to be included in the upstream source checkpoint state. Let's look at two use cases: 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm, then Kafka source starts with initial position of 5 pm. In this case, there is no need for converter or EndStateT since the starting time for Kafka source is known and fixed. 2) dynamic cutover time at 1 hour before now. This is useful when the bootstrap of historic data takes a long time (like days or weeks) and we don't know the exact time of cutover when a job is launched. Instead, we are instructing the file source to stop when it gets close to live data. In this case, hybrid source construction will specify a relative time (now - 1 hour), the EndStateT (of file source) will be resolved to an absolute time for cutover. We probably don't need to include EndStateT (end timestamp) as the file source checkpoint state. Hence, the separate EndStateT is probably more desirable. We also discussed the converter for the Kafka source. Kafka source supports different OffsetsInitializer impls (including TimestampOffsetsInitializer). To support the dynamic cutover time (use case #2 above), we can plug in a SupplierTimestampOffsetInitializer, where the starting timestamp is not set during source/job construction. Rather it is a supplier model where the starting timestamp value is set to the resolved absolute timestamp during switch. Thanks, Steven On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote: > Hi Nicholas, > > Thanks for taking a look at the PR! > > 1. Regarding switching mechanism: > > There has been previous discussion in this thread regarding the pros > and cons of how the switching can be exposed to the user. > > With fixed start positions, no special switching interface to transfer > information between enumerators is required. Sources are configured as > they would be when used standalone and just plugged into HybridSource. > I expect that to be a common use case. You can find an example for > this in the ITCase: > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > For dynamic start position, the checkpoint state is used to transfer > information from old to new enumerator. An example for that can be > found here: > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > That may look verbose, but the code to convert from one state to > another can be factored out into a utility and the function becomes a > one-liner. > > For common sources like files and Kafka we can potentially (later) > implement the conversion logic as part of the respective connector's > checkpoint and split classes. > > I hope that with the PR up for review, we can soon reach a conclusion > on how we want to expose this to the user. > > Following is an example for Files -> Files -> Kafka that I'm using for > e2e testing. It exercises both ways of setting the start position. > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > 2. Regarding the events used to implement the actual switch between > enumerator and readers: I updated the PR with javadoc to clarify the > intent. Please let me know if that helps or let's continue to discuss > those details on the PR? > > > Thanks, > Thomas > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> > wrote: > > > > Hi Thomas, > > > > Sorry for later reply for your POC. I have reviewed the based abstract > > implementation of your pull request: > > https://github.com/apache/flink/pull/15924. IMO, for the switching > > mechanism, this level of abstraction is not concise enough, which doesn't > > make connector contribution easier. In theory, it is necessary to > introduce > > a set of interfaces to support the switching mechanism. The > SwitchableSource > > and SwitchableSplitEnumerator interfaces are needed for connector > > expansibility. > > In other words, the whole switching process of above mentioned PR is > > different from that mentioned in FLIP-150. In the above implementation, > the > > source reading switching is executed after receving the > SwitchSourceEvent, > > which could be before the sending SourceReaderFinishEvent. This timeline > of > > source reading switching could be discussed here. > > @Stephan @Becket, if you are available, please help to review the > > abstract implementation, and compare with the interfaces mentioned in > > FLIP-150. > > > > Thanks, > > Nicholas Jiang > > > > > > > > -- > > Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Hi Steven,
Thank you for the thorough review of the PR and for bringing this back to the mailing list. All, I updated the FLIP-150 page to highlight aspects in which the PR deviates from the original proposal [1]. The goal would be to update the FLIP soon and bring it to a vote, as previously suggested offline by Nicholas. A few minor issues in the PR are outstanding and I'm working on test coverage for the recovery behavior, which should be completed soon. The dynamic position transfer needs to be concluded before we can move forward however. There have been various ideas, including the special "SwitchableEnumerator" interface, using enumerator checkpoint state or an enumerator interface extension to extract the end state. One goal in the FLIP is to "Reuse the existing Source connectors built with FLIP-27 without any change." and I think it is important to honor that goal given that fixed start positions do not require interface changes. Based on the feedback the following might be a good solution for runtime position transfer: * User supplies the optional converter function (not applicable for fixed positions). * Instead of relying on the enumerator checkpoint state [2], the converter function will be supplied with the current and next enumerator (source.createEnumerator). * Converter function relies on the specific enumerator capabilities to set the new start position (e.g. fileSourceEnumerator.getEndTimestamp() and kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) * HybridSourceSplitEnumerator starts new underlying enumerator With this approach, there is no need to augment FLIP-27 interfaces and custom source capabilities are easier to integrate. Removing the mandate to rely on enumerator checkpoint state also avoids potential upgrade/compatibility issues. Thoughts? Thanks, Thomas [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation [2] https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote: > > discussed the PR with Thosmas offline. Thomas, please correct me if I > missed anything. > > Right now, the PR differs from the FLIP-150 doc regarding the converter. > * Current PR uses the enumerator checkpoint state type as the input for the > converter > * FLIP-150 defines a new EndStateT interface. > It seems that the FLIP-150 approach of EndStateT is more flexible, as > transition EndStateT doesn't have to be included in the upstream source > checkpoint state. > > Let's look at two use cases: > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm, > then Kafka source starts with initial position of 5 pm. In this case, there > is no need for converter or EndStateT since the starting time for Kafka > source is known and fixed. > 2) dynamic cutover time at 1 hour before now. This is useful when the > bootstrap of historic data takes a long time (like days or weeks) and we > don't know the exact time of cutover when a job is launched. Instead, we > are instructing the file source to stop when it gets close to live data. In > this case, hybrid source construction will specify a relative time (now - 1 > hour), the EndStateT (of file source) will be resolved to an absolute time > for cutover. We probably don't need to include EndStateT (end timestamp) as > the file source checkpoint state. Hence, the separate EndStateT is probably > more desirable. > > We also discussed the converter for the Kafka source. Kafka source supports > different OffsetsInitializer impls (including TimestampOffsetsInitializer). > To support the dynamic cutover time (use case #2 above), we can plug in a > SupplierTimestampOffsetInitializer, where the starting timestamp is not set > during source/job construction. Rather it is a supplier model where the > starting timestamp value is set to the resolved absolute timestamp during > switch. > > Thanks, > Steven > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote: > > > Hi Nicholas, > > > > Thanks for taking a look at the PR! > > > > 1. Regarding switching mechanism: > > > > There has been previous discussion in this thread regarding the pros > > and cons of how the switching can be exposed to the user. > > > > With fixed start positions, no special switching interface to transfer > > information between enumerators is required. Sources are configured as > > they would be when used standalone and just plugged into HybridSource. > > I expect that to be a common use case. You can find an example for > > this in the ITCase: > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > > For dynamic start position, the checkpoint state is used to transfer > > information from old to new enumerator. An example for that can be > > found here: > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > > > That may look verbose, but the code to convert from one state to > > another can be factored out into a utility and the function becomes a > > one-liner. > > > > For common sources like files and Kafka we can potentially (later) > > implement the conversion logic as part of the respective connector's > > checkpoint and split classes. > > > > I hope that with the PR up for review, we can soon reach a conclusion > > on how we want to expose this to the user. > > > > Following is an example for Files -> Files -> Kafka that I'm using for > > e2e testing. It exercises both ways of setting the start position. > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > > > > 2. Regarding the events used to implement the actual switch between > > enumerator and readers: I updated the PR with javadoc to clarify the > > intent. Please let me know if that helps or let's continue to discuss > > those details on the PR? > > > > > > Thanks, > > Thomas > > > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> > > wrote: > > > > > > Hi Thomas, > > > > > > Sorry for later reply for your POC. I have reviewed the based abstract > > > implementation of your pull request: > > > https://github.com/apache/flink/pull/15924. IMO, for the switching > > > mechanism, this level of abstraction is not concise enough, which doesn't > > > make connector contribution easier. In theory, it is necessary to > > introduce > > > a set of interfaces to support the switching mechanism. The > > SwitchableSource > > > and SwitchableSplitEnumerator interfaces are needed for connector > > > expansibility. > > > In other words, the whole switching process of above mentioned PR is > > > different from that mentioned in FLIP-150. In the above implementation, > > the > > > source reading switching is executed after receving the > > SwitchSourceEvent, > > > which could be before the sending SourceReaderFinishEvent. This timeline > > of > > > source reading switching could be discussed here. > > > @Stephan @Becket, if you are available, please help to review the > > > abstract implementation, and compare with the interfaces mentioned in > > > FLIP-150. > > > > > > Thanks, > > > Nicholas Jiang > > > > > > > > > > > > -- > > > Sent from: > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > |
> Converter function relies on the specific enumerator capabilities to set
the new start position (e.g. fileSourceEnumerator.getEndTimestamp() and kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) I guess the premise is that a converter is for a specific tuple of (upstream source, downstream source) . We don't have to define generic EndtStateT and SwitchableEnumerator interfaces. That should work. The benefit of defining EndtStateT and SwitchableEnumerator interfaces is probably promoting uniformity across sources that support hybrid/switchable source. On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote: > Hi Steven, > > Thank you for the thorough review of the PR and for bringing this back > to the mailing list. > > All, > > I updated the FLIP-150 page to highlight aspects in which the PR > deviates from the original proposal [1]. The goal would be to update > the FLIP soon and bring it to a vote, as previously suggested offline > by Nicholas. > > A few minor issues in the PR are outstanding and I'm working on test > coverage for the recovery behavior, which should be completed soon. > > The dynamic position transfer needs to be concluded before we can move > forward however. > > There have been various ideas, including the special > "SwitchableEnumerator" interface, using enumerator checkpoint state or > an enumerator interface extension to extract the end state. > > One goal in the FLIP is to "Reuse the existing Source connectors built > with FLIP-27 without any change." and I think it is important to honor > that goal given that fixed start positions do not require interface > changes. > > Based on the feedback the following might be a good solution for > runtime position transfer: > > * User supplies the optional converter function (not applicable for > fixed positions). > * Instead of relying on the enumerator checkpoint state [2], the > converter function will be supplied with the current and next > enumerator (source.createEnumerator). > * Converter function relies on the specific enumerator capabilities to > set the new start position (e.g. > fileSourceEnumerator.getEndTimestamp() and > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > * HybridSourceSplitEnumerator starts new underlying enumerator > > With this approach, there is no need to augment FLIP-27 interfaces and > custom source capabilities are easier to integrate. Removing the > mandate to rely on enumerator checkpoint state also avoids potential > upgrade/compatibility issues. > > Thoughts? > > Thanks, > Thomas > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation > [2] > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 > > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote: > > > > discussed the PR with Thosmas offline. Thomas, please correct me if I > > missed anything. > > > > Right now, the PR differs from the FLIP-150 doc regarding the converter. > > * Current PR uses the enumerator checkpoint state type as the input for > the > > converter > > * FLIP-150 defines a new EndStateT interface. > > It seems that the FLIP-150 approach of EndStateT is more flexible, as > > transition EndStateT doesn't have to be included in the upstream source > > checkpoint state. > > > > Let's look at two use cases: > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 > pm, > > then Kafka source starts with initial position of 5 pm. In this case, > there > > is no need for converter or EndStateT since the starting time for Kafka > > source is known and fixed. > > 2) dynamic cutover time at 1 hour before now. This is useful when the > > bootstrap of historic data takes a long time (like days or weeks) and we > > don't know the exact time of cutover when a job is launched. Instead, we > > are instructing the file source to stop when it gets close to live data. > In > > this case, hybrid source construction will specify a relative time (now > - 1 > > hour), the EndStateT (of file source) will be resolved to an absolute > time > > for cutover. We probably don't need to include EndStateT (end timestamp) > as > > the file source checkpoint state. Hence, the separate EndStateT is > probably > > more desirable. > > > > We also discussed the converter for the Kafka source. Kafka source > supports > > different OffsetsInitializer impls (including > TimestampOffsetsInitializer). > > To support the dynamic cutover time (use case #2 above), we can plug in a > > SupplierTimestampOffsetInitializer, where the starting timestamp is not > set > > during source/job construction. Rather it is a supplier model where the > > starting timestamp value is set to the resolved absolute timestamp during > > switch. > > > > Thanks, > > Steven > > > > > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote: > > > > > Hi Nicholas, > > > > > > Thanks for taking a look at the PR! > > > > > > 1. Regarding switching mechanism: > > > > > > There has been previous discussion in this thread regarding the pros > > > and cons of how the switching can be exposed to the user. > > > > > > With fixed start positions, no special switching interface to transfer > > > information between enumerators is required. Sources are configured as > > > they would be when used standalone and just plugged into HybridSource. > > > I expect that to be a common use case. You can find an example for > > > this in the ITCase: > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > > > > For dynamic start position, the checkpoint state is used to transfer > > > information from old to new enumerator. An example for that can be > > > found here: > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > > > > > That may look verbose, but the code to convert from one state to > > > another can be factored out into a utility and the function becomes a > > > one-liner. > > > > > > For common sources like files and Kafka we can potentially (later) > > > implement the conversion logic as part of the respective connector's > > > checkpoint and split classes. > > > > > > I hope that with the PR up for review, we can soon reach a conclusion > > > on how we want to expose this to the user. > > > > > > Following is an example for Files -> Files -> Kafka that I'm using for > > > e2e testing. It exercises both ways of setting the start position. > > > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > > > > > > > 2. Regarding the events used to implement the actual switch between > > > enumerator and readers: I updated the PR with javadoc to clarify the > > > intent. Please let me know if that helps or let's continue to discuss > > > those details on the PR? > > > > > > > > > Thanks, > > > Thomas > > > > > > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> > > > wrote: > > > > > > > > Hi Thomas, > > > > > > > > Sorry for later reply for your POC. I have reviewed the based > abstract > > > > implementation of your pull request: > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching > > > > mechanism, this level of abstraction is not concise enough, which > doesn't > > > > make connector contribution easier. In theory, it is necessary to > > > introduce > > > > a set of interfaces to support the switching mechanism. The > > > SwitchableSource > > > > and SwitchableSplitEnumerator interfaces are needed for connector > > > > expansibility. > > > > In other words, the whole switching process of above mentioned PR > is > > > > different from that mentioned in FLIP-150. In the above > implementation, > > > the > > > > source reading switching is executed after receving the > > > SwitchSourceEvent, > > > > which could be before the sending SourceReaderFinishEvent. This > timeline > > > of > > > > source reading switching could be discussed here. > > > > @Stephan @Becket, if you are available, please help to review the > > > > abstract implementation, and compare with the interfaces mentioned in > > > > FLIP-150. > > > > > > > > Thanks, > > > > Nicholas Jiang > > > > > > > > > > > > > > > > -- > > > > Sent from: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > |
Sorry for joining the party so late, but it's such an interesting FLIP with
a huge impact that I wanted to add my 2 cents. [1] I'm mirroring some basic question from the PR review to this thread because it's about the name: We could rename the thing to ConcatenatedSource(s), SourceSequence, or similar. Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and does not carry the concatentation concept as well (hybrid sounds to me more like the source would constantly switch back and forth). Could we take a few minutes to think if this is the most intuitive name for new users? I'm especially hoping that natives might give some ideas (or declare that Hybrid is perfect). [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664 On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <[hidden email]> wrote: > > Converter function relies on the specific enumerator capabilities to set > the new start position (e.g. > fileSourceEnumerator.getEndTimestamp() and > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > > I guess the premise is that a converter is for a specific tuple of > (upstream source, downstream source) . We don't have to define generic > EndtStateT and SwitchableEnumerator interfaces. That should work. > > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is > probably promoting uniformity across sources that support hybrid/switchable > source. > > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <[hidden email]> wrote: > > > Hi Steven, > > > > Thank you for the thorough review of the PR and for bringing this back > > to the mailing list. > > > > All, > > > > I updated the FLIP-150 page to highlight aspects in which the PR > > deviates from the original proposal [1]. The goal would be to update > > the FLIP soon and bring it to a vote, as previously suggested offline > > by Nicholas. > > > > A few minor issues in the PR are outstanding and I'm working on test > > coverage for the recovery behavior, which should be completed soon. > > > > The dynamic position transfer needs to be concluded before we can move > > forward however. > > > > There have been various ideas, including the special > > "SwitchableEnumerator" interface, using enumerator checkpoint state or > > an enumerator interface extension to extract the end state. > > > > One goal in the FLIP is to "Reuse the existing Source connectors built > > with FLIP-27 without any change." and I think it is important to honor > > that goal given that fixed start positions do not require interface > > changes. > > > > Based on the feedback the following might be a good solution for > > runtime position transfer: > > > > * User supplies the optional converter function (not applicable for > > fixed positions). > > * Instead of relying on the enumerator checkpoint state [2], the > > converter function will be supplied with the current and next > > enumerator (source.createEnumerator). > > * Converter function relies on the specific enumerator capabilities to > > set the new start position (e.g. > > fileSourceEnumerator.getEndTimestamp() and > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > > * HybridSourceSplitEnumerator starts new underlying enumerator > > > > With this approach, there is no need to augment FLIP-27 interfaces and > > custom source capabilities are easier to integrate. Removing the > > mandate to rely on enumerator checkpoint state also avoids potential > > upgrade/compatibility issues. > > > > Thoughts? > > > > Thanks, > > Thomas > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation > > [2] > > > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 > > > > > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <[hidden email]> wrote: > > > > > > discussed the PR with Thosmas offline. Thomas, please correct me if I > > > missed anything. > > > > > > Right now, the PR differs from the FLIP-150 doc regarding the > converter. > > > * Current PR uses the enumerator checkpoint state type as the input for > > the > > > converter > > > * FLIP-150 defines a new EndStateT interface. > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as > > > transition EndStateT doesn't have to be included in the upstream source > > > checkpoint state. > > > > > > Let's look at two use cases: > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 > > pm, > > > then Kafka source starts with initial position of 5 pm. In this case, > > there > > > is no need for converter or EndStateT since the starting time for Kafka > > > source is known and fixed. > > > 2) dynamic cutover time at 1 hour before now. This is useful when the > > > bootstrap of historic data takes a long time (like days or weeks) and > we > > > don't know the exact time of cutover when a job is launched. Instead, > we > > > are instructing the file source to stop when it gets close to live > data. > > In > > > this case, hybrid source construction will specify a relative time (now > > - 1 > > > hour), the EndStateT (of file source) will be resolved to an absolute > > time > > > for cutover. We probably don't need to include EndStateT (end > timestamp) > > as > > > the file source checkpoint state. Hence, the separate EndStateT is > > probably > > > more desirable. > > > > > > We also discussed the converter for the Kafka source. Kafka source > > supports > > > different OffsetsInitializer impls (including > > TimestampOffsetsInitializer). > > > To support the dynamic cutover time (use case #2 above), we can plug > in a > > > SupplierTimestampOffsetInitializer, where the starting timestamp is not > > set > > > during source/job construction. Rather it is a supplier model where the > > > starting timestamp value is set to the resolved absolute timestamp > during > > > switch. > > > > > > Thanks, > > > Steven > > > > > > > > > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <[hidden email]> wrote: > > > > > > > Hi Nicholas, > > > > > > > > Thanks for taking a look at the PR! > > > > > > > > 1. Regarding switching mechanism: > > > > > > > > There has been previous discussion in this thread regarding the pros > > > > and cons of how the switching can be exposed to the user. > > > > > > > > With fixed start positions, no special switching interface to > transfer > > > > information between enumerators is required. Sources are configured > as > > > > they would be when used standalone and just plugged into > HybridSource. > > > > I expect that to be a common use case. You can find an example for > > > > this in the ITCase: > > > > > > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > > > > > > For dynamic start position, the checkpoint state is used to transfer > > > > information from old to new enumerator. An example for that can be > > > > found here: > > > > > > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > > > > > > > That may look verbose, but the code to convert from one state to > > > > another can be factored out into a utility and the function becomes a > > > > one-liner. > > > > > > > > For common sources like files and Kafka we can potentially (later) > > > > implement the conversion logic as part of the respective connector's > > > > checkpoint and split classes. > > > > > > > > I hope that with the PR up for review, we can soon reach a conclusion > > > > on how we want to expose this to the user. > > > > > > > > Following is an example for Files -> Files -> Kafka that I'm using > for > > > > e2e testing. It exercises both ways of setting the start position. > > > > > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > > > > > > > > > > 2. Regarding the events used to implement the actual switch between > > > > enumerator and readers: I updated the PR with javadoc to clarify the > > > > intent. Please let me know if that helps or let's continue to discuss > > > > those details on the PR? > > > > > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <[hidden email]> > > > > wrote: > > > > > > > > > > Hi Thomas, > > > > > > > > > > Sorry for later reply for your POC. I have reviewed the based > > abstract > > > > > implementation of your pull request: > > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching > > > > > mechanism, this level of abstraction is not concise enough, which > > doesn't > > > > > make connector contribution easier. In theory, it is necessary to > > > > introduce > > > > > a set of interfaces to support the switching mechanism. The > > > > SwitchableSource > > > > > and SwitchableSplitEnumerator interfaces are needed for connector > > > > > expansibility. > > > > > In other words, the whole switching process of above mentioned > PR > > is > > > > > different from that mentioned in FLIP-150. In the above > > implementation, > > > > the > > > > > source reading switching is executed after receving the > > > > SwitchSourceEvent, > > > > > which could be before the sending SourceReaderFinishEvent. This > > timeline > > > > of > > > > > source reading switching could be discussed here. > > > > > @Stephan @Becket, if you are available, please help to review > the > > > > > abstract implementation, and compare with the interfaces mentioned > in > > > > > FLIP-150. > > > > > > > > > > Thanks, > > > > > Nicholas Jiang > > > > > > > > > > > > > > > > > > > > -- > > > > > Sent from: > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > > > |
Free forum by Nabble | Edit this page |