On 22.09.20 06:06, Steven Wu wrote:
> In addition, it is undesirable to do the committed-or-not check in the > commit method, which happens for each checkpoint cycle. CommitResult > already indicates SUCCESS or not. when framework calls commit with a list > of GlobalCommittableT, it should be certain they are uncommitted. The only > time we aren't sure is when a list of GlobalCommittableT is restored from > a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do such a > check and filter out the ones that were already committed. Retained ones > will be committed in the next checkpoint cycle. Since framework takes care > of the checkpoint and restore, we need some hook for the sink to add the > custom logic on the restored list. I think we don't need the `recoverGlobalCommittables()` hook. The sink implementation would have to do the filtering once, so it can either do it in the recover hook or it could do it in the next `commit()` call. Both of these would mean we only have to do one pass through the list and connect to Iceberg. Doing the check in `commit()` would mean the interface of GlobalCommittable is simpler and to me it seems natural that we do the check in the commit() method to ensure that commits are idempotent. What do you think? Aljoscha |
Ah sorry, I think I now see what you mean. I think it's ok to add a
`List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)` method. On 22.09.20 09:42, Aljoscha Krettek wrote: > On 22.09.20 06:06, Steven Wu wrote: >> In addition, it is undesirable to do the committed-or-not check in the >> commit method, which happens for each checkpoint cycle. CommitResult >> already indicates SUCCESS or not. when framework calls commit with a list >> of GlobalCommittableT, it should be certain they are uncommitted. The >> only >> time we aren't sure is when a list of GlobalCommittableT is restored >> from >> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do >> such a >> check and filter out the ones that were already committed. Retained ones >> will be committed in the next checkpoint cycle. Since framework takes >> care >> of the checkpoint and restore, we need some hook for the sink to add the >> custom logic on the restored list. > > I think we don't need the `recoverGlobalCommittables()` hook. The sink > implementation would have to do the filtering once, so it can either do > it in the recover hook or it could do it in the next `commit()` call. > Both of these would mean we only have to do one pass through the list > and connect to Iceberg. Doing the check in `commit()` would mean the > interface of GlobalCommittable is simpler and to me it seems natural > that we do the check in the commit() method to ensure that commits are > idempotent. > > What do you think? > > Aljoscha |
Thanks @aljoscha summary. I agree we should postpone the discussion of the
sink topology first and focus on the normal file sink and IcebergSink in the Flink 1.12. I have three little questions: 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So that we could make `write success file` be available in both batch and stream execution mode. 2. If we choose to let the two types of committer appear at the same time in the API we have to figure out how to express the relation between the two committers. I think the Sink API may look like the following: What do you think? Sink<T, CommT, CommR, ShareStateT...> { Writer<T, CommT, ShareStateT.....> createWriter(); Optional<Committer<CommT>> createCommitter(); Optional<GlobalComiitter<CommT, GlobalCommT>> createGlobalCommitter(); } 3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I think the sink developer could rety himself. Why need the framework to do the retry? Best, Guowei On Tue, Sep 22, 2020 at 4:47 PM Aljoscha Krettek <[hidden email]> wrote: > Ah sorry, I think I now see what you mean. I think it's ok to add a > `List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)` > method. > > > On 22.09.20 09:42, Aljoscha Krettek wrote: > > On 22.09.20 06:06, Steven Wu wrote: > >> In addition, it is undesirable to do the committed-or-not check in the > >> commit method, which happens for each checkpoint cycle. CommitResult > >> already indicates SUCCESS or not. when framework calls commit with a > list > >> of GlobalCommittableT, it should be certain they are uncommitted. The > >> only > >> time we aren't sure is when a list of GlobalCommittableT is restored > >> from > >> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do > >> such a > >> check and filter out the ones that were already committed. Retained ones > >> will be committed in the next checkpoint cycle. Since framework takes > >> care > >> of the checkpoint and restore, we need some hook for the sink to add the > >> custom logic on the restored list. > > > > I think we don't need the `recoverGlobalCommittables()` hook. The sink > > implementation would have to do the filtering once, so it can either do > > it in the recover hook or it could do it in the next `commit()` call. > > Both of these would mean we only have to do one pass through the list > > and connect to Iceberg. Doing the check in `commit()` would mean the > > interface of GlobalCommittable is simpler and to me it seems natural > > that we do the check in the commit() method to ensure that commits are > > idempotent. > > > > What do you think? > > > > Aljoscha > > |
On 22.09.20 11:10, Guowei Ma wrote:
> 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So > that we could make `write success file` be available in both batch and > stream execution mode. We could, yes. I'm now hesitant because we're adding more things but I think it should be fine. > 2. If we choose to let the two types of committer appear at the same time > in the API we have to figure out how to express the relation between the > two committers. I think the Sink API may look like the following: What do > you think? > Sink<T, CommT, CommR, ShareStateT...> { > Writer<T, CommT, ShareStateT.....> createWriter(); > Optional<Committer<CommT>> createCommitter(); > Optional<GlobalComiitter<CommT, GlobalCommT>> > createGlobalCommitter(); > } Yes, I think this is what we should do. Though I think that we should initially not support shared state. The FileSink only uses this to create unique file names and I think we can do without it. If we see that we need it later we can add it but I would like to keep things minimal initially. It's always easy to add things later but it's hard to take things away once you added them. > 3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I > think the sink developer could rety himself. Why need the framework to do > the retry? It's not a silly question at all! I think we need the retry to support such problems as Steven mentioned. If a commit fails a RETRY tells the framework that it should keep the commits in state and retry them on the next checkpoint. When the committer returns FAILURE we should just fail the job. It's to support temporary outages of the external metastore. I'm open to leaving it out of the initial version for the same reasons I mentioned above but I think it could be valuable. Best, Aljoscha |
>> I believe that we could support such an async sink writer
>> very easily in the future. What do you think? >> How would you see the expansion in the future? Do you mean just adding `isAvailable()` method with a default implementation later on? Hi @piotr <[hidden email]> Actually I am not sure adding `isAvailable` is enough. Maybe it is not. But for the initial version I hope we could make the sink api sync because there is already a lot of stuff that has to finish. :--) What do you think? Best, Guowei On Tue, Sep 22, 2020 at 5:25 PM Aljoscha Krettek <[hidden email]> wrote: > On 22.09.20 11:10, Guowei Ma wrote: > > 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So > > that we could make `write success file` be available in both batch and > > stream execution mode. > > We could, yes. I'm now hesitant because we're adding more things but I > think it should be fine. > > > 2. If we choose to let the two types of committer appear at the same > time > > in the API we have to figure out how to express the relation between the > > two committers. I think the Sink API may look like the following: What do > > you think? > > Sink<T, CommT, CommR, ShareStateT...> { > > Writer<T, CommT, ShareStateT.....> createWriter(); > > Optional<Committer<CommT>> createCommitter(); > > Optional<GlobalComiitter<CommT, GlobalCommT>> > > createGlobalCommitter(); > > } > > Yes, I think this is what we should do. Though I think that we should > initially not support shared state. The FileSink only uses this to > create unique file names and I think we can do without it. If we see > that we need it later we can add it but I would like to keep things > minimal initially. It's always easy to add things later but it's hard to > take things away once you added them. > > > 3. Maybe a silly question: Why do we need `commit` return > `CommitResult`? I > > think the sink developer could rety himself. Why need the framework to do > > the retry? > > It's not a silly question at all! I think we need the retry to support > such problems as Steven mentioned. If a commit fails a RETRY tells the > framework that it should keep the commits in state and retry them on the > next checkpoint. When the committer returns FAILURE we should just fail > the job. It's to support temporary outages of the external metastore. > > I'm open to leaving it out of the initial version for the same reasons I > mentioned above but I think it could be valuable. > > Best, > Aljoscha > > |
On 22.09.20 13:26, Guowei Ma wrote:
> Actually I am not sure adding `isAvailable` is enough. Maybe it is not. > But for the initial version I hope we could make the sink api sync because > there is already a lot of stuff that has to finish. :--) I agree, for the first version we should stick to a simpler synchronous interface. Aljoscha |
It is fine to leave the CommitResult/RETRY outside the scope of framework.
Then the framework might need to provide some hooks in the checkpoint/restore logic. because the commit happened in the post checkpoint completion step, sink needs to update the internal state when the commit is successful so that the next checkpoint won't include the committed GlobalCommT. Maybe GlobalCommitter can have an API like this? > List<GlobalCommT> snapshotState(); But then we still need the recover API if we don't let sink directly manage the state. > List<GlobalCommT> recoverCommittables(List<GlobalCommT>) Thanks, Steven On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]> wrote: > On 22.09.20 13:26, Guowei Ma wrote: > > Actually I am not sure adding `isAvailable` is enough. Maybe it is not. > > But for the initial version I hope we could make the sink api sync > because > > there is already a lot of stuff that has to finish. :--) > > I agree, for the first version we should stick to a simpler synchronous > interface. > > Aljoscha > |
Previous APIs discussed have been trying to do more in the framework. If we
take a different approach to a lighter framework, these sets of minimal APIs are probably good enough. Sink can handle the bookkeeping, merge, retry logics. /** * CommT is the DataFile in Iceberg * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg */ interface GlobalCommitter<CommT, GlobalCommT> { void collect(CommT); void commit(); List<GlobalCommT> snapshotState(); // this is just a callback to sink so that it can perform filter and retain the uncommitted GlobalCommT in the internal bookkeeping void recoveredCommittables(List<GlobalCommT>) ; } The most important need from the framework is to run GlobalCommitter in the jobmanager. It involves the topology creation, checkpoint handling, serializing the executions of commit() calls etc. Thanks, Steven On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote: > It is fine to leave the CommitResult/RETRY outside the scope of framework. > Then the framework might need to provide some hooks in the > checkpoint/restore logic. because the commit happened in the post > checkpoint completion step, sink needs to update the internal state when > the commit is successful so that the next checkpoint won't include the > committed GlobalCommT. > > Maybe GlobalCommitter can have an API like this? > > List<GlobalCommT> snapshotState(); > > But then we still need the recover API if we don't let sink directly > manage the state. > > List<GlobalCommT> recoverCommittables(List<GlobalCommT>) > > Thanks, > Steven > > On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]> > wrote: > >> On 22.09.20 13:26, Guowei Ma wrote: >> > Actually I am not sure adding `isAvailable` is enough. Maybe it is not. >> > But for the initial version I hope we could make the sink api sync >> because >> > there is already a lot of stuff that has to finish. :--) >> >> I agree, for the first version we should stick to a simpler synchronous >> interface. >> >> Aljoscha >> > |
I think we should go with something like
List<GlobalCommitT> filterRecoveredCommittables(List<>) to keep things simple. This should also be easy to do from the framework side and then the sink doesn't need to do any custom state handling. Best, Aljoscha On 22.09.20 16:03, Steven Wu wrote: > Previous APIs discussed have been trying to do more in the framework. If we > take a different approach to a lighter framework, these sets of > minimal APIs are probably good enough. Sink can handle the bookkeeping, > merge, retry logics. > > /** > * CommT is the DataFile in Iceberg > * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg > */ > interface GlobalCommitter<CommT, GlobalCommT> { > > void collect(CommT); > > void commit(); > > List<GlobalCommT> snapshotState(); > > // this is just a callback to sink so that it can perform filter and > retain the uncommitted GlobalCommT in the internal bookkeeping > void recoveredCommittables(List<GlobalCommT>) ; > } > > The most important need from the framework is to run GlobalCommitter in the > jobmanager. It involves the topology creation, checkpoint handling, > serializing the executions of commit() calls etc. > > Thanks, > Steven > > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote: > >> It is fine to leave the CommitResult/RETRY outside the scope of framework. >> Then the framework might need to provide some hooks in the >> checkpoint/restore logic. because the commit happened in the post >> checkpoint completion step, sink needs to update the internal state when >> the commit is successful so that the next checkpoint won't include the >> committed GlobalCommT. >> >> Maybe GlobalCommitter can have an API like this? >>> List<GlobalCommT> snapshotState(); >> >> But then we still need the recover API if we don't let sink directly >> manage the state. >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>) >> >> Thanks, >> Steven >> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]> >> wrote: >> >>> On 22.09.20 13:26, Guowei Ma wrote: >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is not. >>>> But for the initial version I hope we could make the sink api sync >>> because >>>> there is already a lot of stuff that has to finish. :--) >>> >>> I agree, for the first version we should stick to a simpler synchronous >>> interface. >>> >>> Aljoscha >>> >> > |
>> I think we should go with something like
>> List<GlobalCommitT> filterRecoveredCommittables(List<>) >> to keep things simple. This should also be easy to do from the framework >> side and then the sink doesn't need to do any custom state handling. I second Aljoscha's proposal. For the first version there is already much stuff to do. For now I think it would be satisfied with IceBerg Sink. Best, Guowei On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <[hidden email]> wrote: > I think we should go with something like > > List<GlobalCommitT> filterRecoveredCommittables(List<>) > > to keep things simple. This should also be easy to do from the framework > side and then the sink doesn't need to do any custom state handling. > > Best, > Aljoscha > > On 22.09.20 16:03, Steven Wu wrote: > > Previous APIs discussed have been trying to do more in the framework. If > we > > take a different approach to a lighter framework, these sets of > > minimal APIs are probably good enough. Sink can handle the bookkeeping, > > merge, retry logics. > > > > /** > > * CommT is the DataFile in Iceberg > > * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg > > */ > > interface GlobalCommitter<CommT, GlobalCommT> { > > > > void collect(CommT); > > > > void commit(); > > > > List<GlobalCommT> snapshotState(); > > > > // this is just a callback to sink so that it can perform filter and > > retain the uncommitted GlobalCommT in the internal bookkeeping > > void recoveredCommittables(List<GlobalCommT>) ; > > } > > > > The most important need from the framework is to run GlobalCommitter in > the > > jobmanager. It involves the topology creation, checkpoint handling, > > serializing the executions of commit() calls etc. > > > > Thanks, > > Steven > > > > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote: > > > >> It is fine to leave the CommitResult/RETRY outside the scope of > framework. > >> Then the framework might need to provide some hooks in the > >> checkpoint/restore logic. because the commit happened in the post > >> checkpoint completion step, sink needs to update the internal state when > >> the commit is successful so that the next checkpoint won't include the > >> committed GlobalCommT. > >> > >> Maybe GlobalCommitter can have an API like this? > >>> List<GlobalCommT> snapshotState(); > >> > >> But then we still need the recover API if we don't let sink directly > >> manage the state. > >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>) > >> > >> Thanks, > >> Steven > >> > >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]> > >> wrote: > >> > >>> On 22.09.20 13:26, Guowei Ma wrote: > >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is > not. > >>>> But for the initial version I hope we could make the sink api sync > >>> because > >>>> there is already a lot of stuff that has to finish. :--) > >>> > >>> I agree, for the first version we should stick to a simpler synchronous > >>> interface. > >>> > >>> Aljoscha > >>> > >> > > > > |
Hi, all
Thank everyone very much for your ideas and suggestions. I would try to summarize again the consensus :). Correct me if I am wrong or misunderstand you. ## Consensus-1 1. The motivation of the unified sink API is to decouple the sink implementation from the different runtime execution mode. 2. The initial scope of the unified sink API only covers the file system type, which supports the real transactions. The FLIP focuses more on the semantics the new sink api should support. 3. We prefer the first alternative API, which could give the framework a greater opportunity to optimize. 4. The `Writer` needs to add a method `prepareCommit`, which would be called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. 5. The FLIP could move the `Snapshot & Drain` section in order to be more focused. ## Consensus-2 1. What should the “Unified Sink API” support/cover? It includes two aspects. 1. The same sink implementation would work for both the batch and stream execution mode. 2. In the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink. 2. Because the batch execution mode does not have the normal checkpoint the sink developer should not depend on it any more if we want a unified sink. 3. We can benefit by providing an asynchronous `Writer` version. But because the unified sink is already very complicated, we don’t add this in the first version. According to these consensus I would propose the first version of the new sink api as follows. What do you think? Any comments are welcome. /** * This interface lets the sink developer build a simple transactional sink topology pattern, which satisfies the HDFS/S3/Iceberg sink. * This sink topology includes one {@link Writer} + one {@link Committer} + one {@link GlobalCommitter}. * The {@link Writer} is responsible for producing the committable. * The {@link Committer} is responsible for committing a single committables. * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables. * * But both the {@link Committer} and the {@link GlobalCommitter} are optional. */ interface TSink<IN, CommT, GCommT, WriterS> { Writer<IN, CommT, WriterS> createWriter(InitContext initContext); Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext, List<WriterS> states); Optional<Committer<CommT>> createCommitter(); Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter(); SimpleVersionedSerializer<CommT> getCommittableSerializer(); Optional<SimpleVersionedSerializer<GCommT>> getGlobalCommittableSerializer(); } /** * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables. */ interface GlobalCommitter<CommT, GCommT> { /** * This method is called when restoring from a failover. * @param globalCommittables the global committables that are not committed in the previous session. * @return the global committables that should be committed again in the current session. */ List<GCommT> filterRecoveredCommittables(List<GCommT> globalCommittables); /** * Compute an aggregated committable from a collection of committables. * @param committables a collection of committables that are needed to combine * @return an aggregated committable */ GCommT combine(List<CommT> committables); void commit(List<GCommT> globalCommittables); /** * There are no committables any more. */ void endOfInput(); } Best, Guowei On Wed, Sep 23, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote: > >> I think we should go with something like > > >> List<GlobalCommitT> filterRecoveredCommittables(List<>) > > >> to keep things simple. This should also be easy to do from the framework > >> side and then the sink doesn't need to do any custom state handling. > > I second Aljoscha's proposal. For the first version there is already much > stuff to do. > For now I think it would be satisfied with IceBerg Sink. > > Best, > Guowei > > > On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <[hidden email]> > wrote: > >> I think we should go with something like >> >> List<GlobalCommitT> filterRecoveredCommittables(List<>) >> >> to keep things simple. This should also be easy to do from the framework >> side and then the sink doesn't need to do any custom state handling. >> >> Best, >> Aljoscha >> >> On 22.09.20 16:03, Steven Wu wrote: >> > Previous APIs discussed have been trying to do more in the framework. >> If we >> > take a different approach to a lighter framework, these sets of >> > minimal APIs are probably good enough. Sink can handle the bookkeeping, >> > merge, retry logics. >> > >> > /** >> > * CommT is the DataFile in Iceberg >> > * GlobalCommT is the checkpoint data type, like ManifestFile in >> Iceberg >> > */ >> > interface GlobalCommitter<CommT, GlobalCommT> { >> > >> > void collect(CommT); >> > >> > void commit(); >> > >> > List<GlobalCommT> snapshotState(); >> > >> > // this is just a callback to sink so that it can perform filter and >> > retain the uncommitted GlobalCommT in the internal bookkeeping >> > void recoveredCommittables(List<GlobalCommT>) ; >> > } >> > >> > The most important need from the framework is to run GlobalCommitter in >> the >> > jobmanager. It involves the topology creation, checkpoint handling, >> > serializing the executions of commit() calls etc. >> > >> > Thanks, >> > Steven >> > >> > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote: >> > >> >> It is fine to leave the CommitResult/RETRY outside the scope of >> framework. >> >> Then the framework might need to provide some hooks in the >> >> checkpoint/restore logic. because the commit happened in the post >> >> checkpoint completion step, sink needs to update the internal state >> when >> >> the commit is successful so that the next checkpoint won't include the >> >> committed GlobalCommT. >> >> >> >> Maybe GlobalCommitter can have an API like this? >> >>> List<GlobalCommT> snapshotState(); >> >> >> >> But then we still need the recover API if we don't let sink directly >> >> manage the state. >> >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>) >> >> >> >> Thanks, >> >> Steven >> >> >> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]> >> >> wrote: >> >> >> >>> On 22.09.20 13:26, Guowei Ma wrote: >> >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is >> not. >> >>>> But for the initial version I hope we could make the sink api sync >> >>> because >> >>>> there is already a lot of stuff that has to finish. :--) >> >>> >> >>> I agree, for the first version we should stick to a simpler >> synchronous >> >>> interface. >> >>> >> >>> Aljoscha >> >>> >> >> >> > >> >> |
Yes, that sounds good! I'll probably have some comments on the FLIP
about the names of generic parameters and the Javadoc but we can address them later or during implementation. I also think that we probably need the FAIL,RETRY,SUCCESS result for globalCommit() but we can also do that as a later addition. So I think we're good to go to update the FLIP, do any last minute changes and then vote. Best, Aljoscha On 23.09.20 06:13, Guowei Ma wrote: > Hi, all > > Thank everyone very much for your ideas and suggestions. I would try to > summarize again the consensus :). Correct me if I am wrong or misunderstand > you. > > ## Consensus-1 > > 1. The motivation of the unified sink API is to decouple the sink > implementation from the different runtime execution mode. > 2. The initial scope of the unified sink API only covers the file system > type, which supports the real transactions. The FLIP focuses more on the > semantics the new sink api should support. > 3. We prefer the first alternative API, which could give the framework a > greater opportunity to optimize. > 4. The `Writer` needs to add a method `prepareCommit`, which would be > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > 5. The FLIP could move the `Snapshot & Drain` section in order to be more > focused. > > ## Consensus-2 > > 1. What should the “Unified Sink API” support/cover? It includes two > aspects. 1. The same sink implementation would work for both the batch and > stream execution mode. 2. In the long run we should give the sink developer > the ability of building “arbitrary” topologies. But for Flink-1.12 we > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > 2. Because the batch execution mode does not have the normal checkpoint the > sink developer should not depend on it any more if we want a unified sink. > 3. We can benefit by providing an asynchronous `Writer` version. But > because the unified sink is already very complicated, we don’t add this in > the first version. > > > According to these consensus I would propose the first version of the new > sink api as follows. What do you think? Any comments are welcome. > > /** > * This interface lets the sink developer build a simple transactional sink > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > * This sink topology includes one {@link Writer} + one {@link Committer} + > one {@link GlobalCommitter}. > * The {@link Writer} is responsible for producing the committable. > * The {@link Committer} is responsible for committing a single > committables. > * The {@link GlobalCommitter} is responsible for committing an aggregated > committable, which we called global committables. > * > * But both the {@link Committer} and the {@link GlobalCommitter} are > optional. > */ > interface TSink<IN, CommT, GCommT, WriterS> { > > Writer<IN, CommT, WriterS> createWriter(InitContext initContext); > > Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext, > List<WriterS> states); > > Optional<Committer<CommT>> createCommitter(); > > Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter(); > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > Optional<SimpleVersionedSerializer<GCommT>> > getGlobalCommittableSerializer(); > } > > /** > * The {@link GlobalCommitter} is responsible for committing an aggregated > committable, which we called global committables. > */ > interface GlobalCommitter<CommT, GCommT> { > > /** > * This method is called when restoring from a failover. > * @param globalCommittables the global committables that are not > committed in the previous session. > * @return the global committables that should be committed again > in the current session. > */ > List<GCommT> filterRecoveredCommittables(List<GCommT> > globalCommittables); > > /** > * Compute an aggregated committable from a collection of > committables. > * @param committables a collection of committables that are needed > to combine > * @return an aggregated committable > */ > GCommT combine(List<CommT> committables); > > void commit(List<GCommT> globalCommittables); > > /** > * There are no committables any more. > */ > void endOfInput(); > } > > Best, > Guowei |
Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are
welcome. Best, Guowei On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]> wrote: > Yes, that sounds good! I'll probably have some comments on the FLIP > about the names of generic parameters and the Javadoc but we can address > them later or during implementation. > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > globalCommit() but we can also do that as a later addition. > > So I think we're good to go to update the FLIP, do any last minute > changes and then vote. > > Best, > Aljoscha > > On 23.09.20 06:13, Guowei Ma wrote: > > Hi, all > > > > Thank everyone very much for your ideas and suggestions. I would try to > > summarize again the consensus :). Correct me if I am wrong or > misunderstand > > you. > > > > ## Consensus-1 > > > > 1. The motivation of the unified sink API is to decouple the sink > > implementation from the different runtime execution mode. > > 2. The initial scope of the unified sink API only covers the file system > > type, which supports the real transactions. The FLIP focuses more on the > > semantics the new sink api should support. > > 3. We prefer the first alternative API, which could give the framework a > > greater opportunity to optimize. > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > > 5. The FLIP could move the `Snapshot & Drain` section in order to be more > > focused. > > > > ## Consensus-2 > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > aspects. 1. The same sink implementation would work for both the batch > and > > stream execution mode. 2. In the long run we should give the sink > developer > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > 2. Because the batch execution mode does not have the normal checkpoint > the > > sink developer should not depend on it any more if we want a unified > sink. > > 3. We can benefit by providing an asynchronous `Writer` version. But > > because the unified sink is already very complicated, we don’t add this > in > > the first version. > > > > > > According to these consensus I would propose the first version of the new > > sink api as follows. What do you think? Any comments are welcome. > > > > /** > > * This interface lets the sink developer build a simple transactional > sink > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > * This sink topology includes one {@link Writer} + one {@link > Committer} + > > one {@link GlobalCommitter}. > > * The {@link Writer} is responsible for producing the committable. > > * The {@link Committer} is responsible for committing a single > > committables. > > * The {@link GlobalCommitter} is responsible for committing an > aggregated > > committable, which we called global committables. > > * > > * But both the {@link Committer} and the {@link GlobalCommitter} are > > optional. > > */ > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > initContext); > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > initContext, > > List<WriterS> states); > > > > Optional<Committer<CommT>> createCommitter(); > > > > Optional<GlobalCommitter<CommT, GCommT>> > createGlobalCommitter(); > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > Optional<SimpleVersionedSerializer<GCommT>> > > getGlobalCommittableSerializer(); > > } > > > > /** > > * The {@link GlobalCommitter} is responsible for committing an > aggregated > > committable, which we called global committables. > > */ > > interface GlobalCommitter<CommT, GCommT> { > > > > /** > > * This method is called when restoring from a failover. > > * @param globalCommittables the global committables that are > not > > committed in the previous session. > > * @return the global committables that should be committed > again > > in the current session. > > */ > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > globalCommittables); > > > > /** > > * Compute an aggregated committable from a collection of > > committables. > > * @param committables a collection of committables that are > needed > > to combine > > * @return an aggregated committable > > */ > > GCommT combine(List<CommT> committables); > > > > void commit(List<GCommT> globalCommittables); > > > > /** > > * There are no committables any more. > > */ > > void endOfInput(); > > } > > > > Best, > > Guowei > > |
Guowei,
Thanks a lot for updating the wiki page. It looks great. I noticed one inconsistency in the wiki with your last summary email for GlobalCommitter interface. I think the version in the summary email is the intended one, because rollover from previous failed commits can accumulate a list. CommitResult commit(GlobalCommT globalCommittable); // in the wiki => CommitResult commit(List<GlobalCommT> globalCommittable); // in the summary email I also have a clarifying question regarding the WriterStateT. Since IcebergWriter won't need to checkpoint any state, should we set it to *Void* type? Since getWriterStateSerializer() returns Optional, that is clear and we can return Optional.empty(). Thanks, Steven On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote: > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are > welcome. > > Best, > Guowei > > > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]> > wrote: > > > Yes, that sounds good! I'll probably have some comments on the FLIP > > about the names of generic parameters and the Javadoc but we can address > > them later or during implementation. > > > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > > globalCommit() but we can also do that as a later addition. > > > > So I think we're good to go to update the FLIP, do any last minute > > changes and then vote. > > > > Best, > > Aljoscha > > > > On 23.09.20 06:13, Guowei Ma wrote: > > > Hi, all > > > > > > Thank everyone very much for your ideas and suggestions. I would try to > > > summarize again the consensus :). Correct me if I am wrong or > > misunderstand > > > you. > > > > > > ## Consensus-1 > > > > > > 1. The motivation of the unified sink API is to decouple the sink > > > implementation from the different runtime execution mode. > > > 2. The initial scope of the unified sink API only covers the file > system > > > type, which supports the real transactions. The FLIP focuses more on > the > > > semantics the new sink api should support. > > > 3. We prefer the first alternative API, which could give the framework > a > > > greater opportunity to optimize. > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be > more > > > focused. > > > > > > ## Consensus-2 > > > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > > aspects. 1. The same sink implementation would work for both the batch > > and > > > stream execution mode. 2. In the long run we should give the sink > > developer > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > > 2. Because the batch execution mode does not have the normal checkpoint > > the > > > sink developer should not depend on it any more if we want a unified > > sink. > > > 3. We can benefit by providing an asynchronous `Writer` version. But > > > because the unified sink is already very complicated, we don’t add this > > in > > > the first version. > > > > > > > > > According to these consensus I would propose the first version of the > new > > > sink api as follows. What do you think? Any comments are welcome. > > > > > > /** > > > * This interface lets the sink developer build a simple transactional > > sink > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > > * This sink topology includes one {@link Writer} + one {@link > > Committer} + > > > one {@link GlobalCommitter}. > > > * The {@link Writer} is responsible for producing the committable. > > > * The {@link Committer} is responsible for committing a single > > > committables. > > > * The {@link GlobalCommitter} is responsible for committing an > > aggregated > > > committable, which we called global committables. > > > * > > > * But both the {@link Committer} and the {@link GlobalCommitter} are > > > optional. > > > */ > > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > > initContext); > > > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > > initContext, > > > List<WriterS> states); > > > > > > Optional<Committer<CommT>> createCommitter(); > > > > > > Optional<GlobalCommitter<CommT, GCommT>> > > createGlobalCommitter(); > > > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > > > Optional<SimpleVersionedSerializer<GCommT>> > > > getGlobalCommittableSerializer(); > > > } > > > > > > /** > > > * The {@link GlobalCommitter} is responsible for committing an > > aggregated > > > committable, which we called global committables. > > > */ > > > interface GlobalCommitter<CommT, GCommT> { > > > > > > /** > > > * This method is called when restoring from a failover. > > > * @param globalCommittables the global committables that are > > not > > > committed in the previous session. > > > * @return the global committables that should be committed > > again > > > in the current session. > > > */ > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > > globalCommittables); > > > > > > /** > > > * Compute an aggregated committable from a collection of > > > committables. > > > * @param committables a collection of committables that are > > needed > > > to combine > > > * @return an aggregated committable > > > */ > > > GCommT combine(List<CommT> committables); > > > > > > void commit(List<GCommT> globalCommittables); > > > > > > /** > > > * There are no committables any more. > > > */ > > > void endOfInput(); > > > } > > > > > > Best, > > > Guowei > > > > > |
Hi,Steven
Thank you for reading the FLIP so carefully. 1. The frame can not know which `GlobalCommT` to retry if we use the List<GlobalCommT> as parameter when the `commit` returns `RETRY`. 2. Of course we can let the `commit` return more detailed info but it might be too complicated. 3. On the other hand, I think only when restoring IcebergSink needs a collection of `GlobalCommT` and giving back another collection of `GlobalCommT` that are not committed. Best, Guowei On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote: > Guowei, > > Thanks a lot for updating the wiki page. It looks great. > > I noticed one inconsistency in the wiki with your last summary email for > GlobalCommitter interface. I think the version in the summary email is the > intended one, because rollover from previous failed commits can accumulate > a list. > CommitResult commit(GlobalCommT globalCommittable); // in the wiki > => > CommitResult commit(List<GlobalCommT> globalCommittable); // in the > summary email > > I also have a clarifying question regarding the WriterStateT. Since > IcebergWriter won't need to checkpoint any state, should we set it to > *Void* > type? Since getWriterStateSerializer() returns Optional, that is clear and > we can return Optional.empty(). > > Thanks, > Steven > > On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote: > > > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments > are > > welcome. > > > > Best, > > Guowei > > > > > > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Yes, that sounds good! I'll probably have some comments on the FLIP > > > about the names of generic parameters and the Javadoc but we can > address > > > them later or during implementation. > > > > > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > > > globalCommit() but we can also do that as a later addition. > > > > > > So I think we're good to go to update the FLIP, do any last minute > > > changes and then vote. > > > > > > Best, > > > Aljoscha > > > > > > On 23.09.20 06:13, Guowei Ma wrote: > > > > Hi, all > > > > > > > > Thank everyone very much for your ideas and suggestions. I would try > to > > > > summarize again the consensus :). Correct me if I am wrong or > > > misunderstand > > > > you. > > > > > > > > ## Consensus-1 > > > > > > > > 1. The motivation of the unified sink API is to decouple the sink > > > > implementation from the different runtime execution mode. > > > > 2. The initial scope of the unified sink API only covers the file > > system > > > > type, which supports the real transactions. The FLIP focuses more on > > the > > > > semantics the new sink api should support. > > > > 3. We prefer the first alternative API, which could give the > framework > > a > > > > greater opportunity to optimize. > > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` > method. > > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be > > more > > > > focused. > > > > > > > > ## Consensus-2 > > > > > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > > > aspects. 1. The same sink implementation would work for both the > batch > > > and > > > > stream execution mode. 2. In the long run we should give the sink > > > developer > > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > > > 2. Because the batch execution mode does not have the normal > checkpoint > > > the > > > > sink developer should not depend on it any more if we want a unified > > > sink. > > > > 3. We can benefit by providing an asynchronous `Writer` version. But > > > > because the unified sink is already very complicated, we don’t add > this > > > in > > > > the first version. > > > > > > > > > > > > According to these consensus I would propose the first version of the > > new > > > > sink api as follows. What do you think? Any comments are welcome. > > > > > > > > /** > > > > * This interface lets the sink developer build a simple > transactional > > > sink > > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > > > * This sink topology includes one {@link Writer} + one {@link > > > Committer} + > > > > one {@link GlobalCommitter}. > > > > * The {@link Writer} is responsible for producing the committable. > > > > * The {@link Committer} is responsible for committing a single > > > > committables. > > > > * The {@link GlobalCommitter} is responsible for committing an > > > aggregated > > > > committable, which we called global committables. > > > > * > > > > * But both the {@link Committer} and the {@link GlobalCommitter} > are > > > > optional. > > > > */ > > > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > > > initContext); > > > > > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > > > initContext, > > > > List<WriterS> states); > > > > > > > > Optional<Committer<CommT>> createCommitter(); > > > > > > > > Optional<GlobalCommitter<CommT, GCommT>> > > > createGlobalCommitter(); > > > > > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > > > > > Optional<SimpleVersionedSerializer<GCommT>> > > > > getGlobalCommittableSerializer(); > > > > } > > > > > > > > /** > > > > * The {@link GlobalCommitter} is responsible for committing an > > > aggregated > > > > committable, which we called global committables. > > > > */ > > > > interface GlobalCommitter<CommT, GCommT> { > > > > > > > > /** > > > > * This method is called when restoring from a failover. > > > > * @param globalCommittables the global committables that > are > > > not > > > > committed in the previous session. > > > > * @return the global committables that should be committed > > > again > > > > in the current session. > > > > */ > > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > > > globalCommittables); > > > > > > > > /** > > > > * Compute an aggregated committable from a collection of > > > > committables. > > > > * @param committables a collection of committables that are > > > needed > > > > to combine > > > > * @return an aggregated committable > > > > */ > > > > GCommT combine(List<CommT> committables); > > > > > > > > void commit(List<GCommT> globalCommittables); > > > > > > > > /** > > > > * There are no committables any more. > > > > */ > > > > void endOfInput(); > > > > } > > > > > > > > Best, > > > > Guowei > > > > > > > > > |
Hi, Steven
>>I also have a clarifying question regarding the WriterStateT. Since >>IcebergWriter won't need to checkpoint any state, should we set it to *Void* >>type? Since getWriterStateSerializer() returns Optional, that is clear and >>we can return Optional.empty(). Yes I think you could do it. If you return Optional.empty() we would ignore all the state you return. Best, Guowei On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote: > Hi,Steven > > Thank you for reading the FLIP so carefully. > 1. The frame can not know which `GlobalCommT` to retry if we use the > List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > 2. Of course we can let the `commit` return more detailed info but it > might be too complicated. > 3. On the other hand, I think only when restoring IcebergSink needs a > collection of `GlobalCommT` and giving back another collection of > `GlobalCommT` that are not committed. > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote: > >> Guowei, >> >> Thanks a lot for updating the wiki page. It looks great. >> >> I noticed one inconsistency in the wiki with your last summary email for >> GlobalCommitter interface. I think the version in the summary email is the >> intended one, because rollover from previous failed commits can accumulate >> a list. >> CommitResult commit(GlobalCommT globalCommittable); // in the wiki >> => >> CommitResult commit(List<GlobalCommT> globalCommittable); // in the >> summary email >> >> I also have a clarifying question regarding the WriterStateT. Since >> IcebergWriter won't need to checkpoint any state, should we set it to >> *Void* >> type? Since getWriterStateSerializer() returns Optional, that is clear and >> we can return Optional.empty(). >> >> Thanks, >> Steven >> >> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote: >> >> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments >> are >> > welcome. >> > >> > Best, >> > Guowei >> > >> > >> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]> >> > wrote: >> > >> > > Yes, that sounds good! I'll probably have some comments on the FLIP >> > > about the names of generic parameters and the Javadoc but we can >> address >> > > them later or during implementation. >> > > >> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for >> > > globalCommit() but we can also do that as a later addition. >> > > >> > > So I think we're good to go to update the FLIP, do any last minute >> > > changes and then vote. >> > > >> > > Best, >> > > Aljoscha >> > > >> > > On 23.09.20 06:13, Guowei Ma wrote: >> > > > Hi, all >> > > > >> > > > Thank everyone very much for your ideas and suggestions. I would >> try to >> > > > summarize again the consensus :). Correct me if I am wrong or >> > > misunderstand >> > > > you. >> > > > >> > > > ## Consensus-1 >> > > > >> > > > 1. The motivation of the unified sink API is to decouple the sink >> > > > implementation from the different runtime execution mode. >> > > > 2. The initial scope of the unified sink API only covers the file >> > system >> > > > type, which supports the real transactions. The FLIP focuses more on >> > the >> > > > semantics the new sink api should support. >> > > > 3. We prefer the first alternative API, which could give the >> framework >> > a >> > > > greater opportunity to optimize. >> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would >> be >> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` >> method. >> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be >> > more >> > > > focused. >> > > > >> > > > ## Consensus-2 >> > > > >> > > > 1. What should the “Unified Sink API” support/cover? It includes two >> > > > aspects. 1. The same sink implementation would work for both the >> batch >> > > and >> > > > stream execution mode. 2. In the long run we should give the sink >> > > developer >> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 >> we >> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. >> > > > 2. Because the batch execution mode does not have the normal >> checkpoint >> > > the >> > > > sink developer should not depend on it any more if we want a unified >> > > sink. >> > > > 3. We can benefit by providing an asynchronous `Writer` version. But >> > > > because the unified sink is already very complicated, we don’t add >> this >> > > in >> > > > the first version. >> > > > >> > > > >> > > > According to these consensus I would propose the first version of >> the >> > new >> > > > sink api as follows. What do you think? Any comments are welcome. >> > > > >> > > > /** >> > > > * This interface lets the sink developer build a simple >> transactional >> > > sink >> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. >> > > > * This sink topology includes one {@link Writer} + one {@link >> > > Committer} + >> > > > one {@link GlobalCommitter}. >> > > > * The {@link Writer} is responsible for producing the committable. >> > > > * The {@link Committer} is responsible for committing a single >> > > > committables. >> > > > * The {@link GlobalCommitter} is responsible for committing an >> > > aggregated >> > > > committable, which we called global committables. >> > > > * >> > > > * But both the {@link Committer} and the {@link GlobalCommitter} >> are >> > > > optional. >> > > > */ >> > > > interface TSink<IN, CommT, GCommT, WriterS> { >> > > > >> > > > Writer<IN, CommT, WriterS> createWriter(InitContext >> > > initContext); >> > > > >> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext >> > > initContext, >> > > > List<WriterS> states); >> > > > >> > > > Optional<Committer<CommT>> createCommitter(); >> > > > >> > > > Optional<GlobalCommitter<CommT, GCommT>> >> > > createGlobalCommitter(); >> > > > >> > > > SimpleVersionedSerializer<CommT> >> getCommittableSerializer(); >> > > > >> > > > Optional<SimpleVersionedSerializer<GCommT>> >> > > > getGlobalCommittableSerializer(); >> > > > } >> > > > >> > > > /** >> > > > * The {@link GlobalCommitter} is responsible for committing an >> > > aggregated >> > > > committable, which we called global committables. >> > > > */ >> > > > interface GlobalCommitter<CommT, GCommT> { >> > > > >> > > > /** >> > > > * This method is called when restoring from a failover. >> > > > * @param globalCommittables the global committables that >> are >> > > not >> > > > committed in the previous session. >> > > > * @return the global committables that should be committed >> > > again >> > > > in the current session. >> > > > */ >> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> >> > > > globalCommittables); >> > > > >> > > > /** >> > > > * Compute an aggregated committable from a collection of >> > > > committables. >> > > > * @param committables a collection of committables that >> are >> > > needed >> > > > to combine >> > > > * @return an aggregated committable >> > > > */ >> > > > GCommT combine(List<CommT> committables); >> > > > >> > > > void commit(List<GCommT> globalCommittables); >> > > > >> > > > /** >> > > > * There are no committables any more. >> > > > */ >> > > > void endOfInput(); >> > > > } >> > > > >> > > > Best, >> > > > Guowei >> > > >> > > >> > >> > |
Hi, all
From the above discussion we could find that FLIP focuses on providing an unified transactional sink API. So I updated the FLIP's title to "Unified Transactional Sink API". But I found that the old link could not be opened again. I would update the link[1] here. Sorry for the inconvenience. [1]https://cwiki.apache.org/confluence/x/KEJ4CQ Best, Guowei On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote: > Hi, Steven > > >>I also have a clarifying question regarding the WriterStateT. Since > >>IcebergWriter won't need to checkpoint any state, should we set it to > *Void* > >>type? Since getWriterStateSerializer() returns Optional, that is clear > and > >>we can return Optional.empty(). > > Yes I think you could do it. If you return Optional.empty() we would > ignore all the state you return. > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote: > >> Hi,Steven >> >> Thank you for reading the FLIP so carefully. >> 1. The frame can not know which `GlobalCommT` to retry if we use the >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. >> 2. Of course we can let the `commit` return more detailed info but it >> might be too complicated. >> 3. On the other hand, I think only when restoring IcebergSink needs a >> collection of `GlobalCommT` and giving back another collection of >> `GlobalCommT` that are not committed. >> >> Best, >> Guowei >> >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote: >> >>> Guowei, >>> >>> Thanks a lot for updating the wiki page. It looks great. >>> >>> I noticed one inconsistency in the wiki with your last summary email for >>> GlobalCommitter interface. I think the version in the summary email is >>> the >>> intended one, because rollover from previous failed commits can >>> accumulate >>> a list. >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki >>> => >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the >>> summary email >>> >>> I also have a clarifying question regarding the WriterStateT. Since >>> IcebergWriter won't need to checkpoint any state, should we set it to >>> *Void* >>> type? Since getWriterStateSerializer() returns Optional, that is clear >>> and >>> we can return Optional.empty(). >>> >>> Thanks, >>> Steven >>> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote: >>> >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any >>> comments are >>> > welcome. >>> > >>> > Best, >>> > Guowei >>> > >>> > >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]> >>> > wrote: >>> > >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP >>> > > about the names of generic parameters and the Javadoc but we can >>> address >>> > > them later or during implementation. >>> > > >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for >>> > > globalCommit() but we can also do that as a later addition. >>> > > >>> > > So I think we're good to go to update the FLIP, do any last minute >>> > > changes and then vote. >>> > > >>> > > Best, >>> > > Aljoscha >>> > > >>> > > On 23.09.20 06:13, Guowei Ma wrote: >>> > > > Hi, all >>> > > > >>> > > > Thank everyone very much for your ideas and suggestions. I would >>> try to >>> > > > summarize again the consensus :). Correct me if I am wrong or >>> > > misunderstand >>> > > > you. >>> > > > >>> > > > ## Consensus-1 >>> > > > >>> > > > 1. The motivation of the unified sink API is to decouple the sink >>> > > > implementation from the different runtime execution mode. >>> > > > 2. The initial scope of the unified sink API only covers the file >>> > system >>> > > > type, which supports the real transactions. The FLIP focuses more >>> on >>> > the >>> > > > semantics the new sink api should support. >>> > > > 3. We prefer the first alternative API, which could give the >>> framework >>> > a >>> > > > greater opportunity to optimize. >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would >>> be >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` >>> method. >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to >>> be >>> > more >>> > > > focused. >>> > > > >>> > > > ## Consensus-2 >>> > > > >>> > > > 1. What should the “Unified Sink API” support/cover? It includes >>> two >>> > > > aspects. 1. The same sink implementation would work for both the >>> batch >>> > > and >>> > > > stream execution mode. 2. In the long run we should give the sink >>> > > developer >>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 >>> we >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. >>> > > > 2. Because the batch execution mode does not have the normal >>> checkpoint >>> > > the >>> > > > sink developer should not depend on it any more if we want a >>> unified >>> > > sink. >>> > > > 3. We can benefit by providing an asynchronous `Writer` version. >>> But >>> > > > because the unified sink is already very complicated, we don’t add >>> this >>> > > in >>> > > > the first version. >>> > > > >>> > > > >>> > > > According to these consensus I would propose the first version of >>> the >>> > new >>> > > > sink api as follows. What do you think? Any comments are welcome. >>> > > > >>> > > > /** >>> > > > * This interface lets the sink developer build a simple >>> transactional >>> > > sink >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. >>> > > > * This sink topology includes one {@link Writer} + one {@link >>> > > Committer} + >>> > > > one {@link GlobalCommitter}. >>> > > > * The {@link Writer} is responsible for producing the >>> committable. >>> > > > * The {@link Committer} is responsible for committing a single >>> > > > committables. >>> > > > * The {@link GlobalCommitter} is responsible for committing an >>> > > aggregated >>> > > > committable, which we called global committables. >>> > > > * >>> > > > * But both the {@link Committer} and the {@link GlobalCommitter} >>> are >>> > > > optional. >>> > > > */ >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { >>> > > > >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext >>> > > initContext); >>> > > > >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext >>> > > initContext, >>> > > > List<WriterS> states); >>> > > > >>> > > > Optional<Committer<CommT>> createCommitter(); >>> > > > >>> > > > Optional<GlobalCommitter<CommT, GCommT>> >>> > > createGlobalCommitter(); >>> > > > >>> > > > SimpleVersionedSerializer<CommT> >>> getCommittableSerializer(); >>> > > > >>> > > > Optional<SimpleVersionedSerializer<GCommT>> >>> > > > getGlobalCommittableSerializer(); >>> > > > } >>> > > > >>> > > > /** >>> > > > * The {@link GlobalCommitter} is responsible for committing an >>> > > aggregated >>> > > > committable, which we called global committables. >>> > > > */ >>> > > > interface GlobalCommitter<CommT, GCommT> { >>> > > > >>> > > > /** >>> > > > * This method is called when restoring from a failover. >>> > > > * @param globalCommittables the global committables that >>> are >>> > > not >>> > > > committed in the previous session. >>> > > > * @return the global committables that should be >>> committed >>> > > again >>> > > > in the current session. >>> > > > */ >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> >>> > > > globalCommittables); >>> > > > >>> > > > /** >>> > > > * Compute an aggregated committable from a collection of >>> > > > committables. >>> > > > * @param committables a collection of committables that >>> are >>> > > needed >>> > > > to combine >>> > > > * @return an aggregated committable >>> > > > */ >>> > > > GCommT combine(List<CommT> committables); >>> > > > >>> > > > void commit(List<GCommT> globalCommittables); >>> > > > >>> > > > /** >>> > > > * There are no committables any more. >>> > > > */ >>> > > > void endOfInput(); >>> > > > } >>> > > > >>> > > > Best, >>> > > > Guowei >>> > > >>> > > >>> > >>> >> |
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > 2. Of course we can let the `commit` return more detailed info but it might > be too complicated. If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs to be retried. E.g. we have some outage with metadata service, commits for checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't want to commit them one by one. It is faster to commit the whole list as one batch. > 3. On the other hand, I think only when restoring IcebergSink needs a > collection of `GlobalCommT` and giving back another collection of > `GlobalCommT` that are not committed That is when the job restarted due to failure or deployment. On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote: > Hi, all > > From the above discussion we could find that FLIP focuses on providing an > unified transactional sink API. So I updated the FLIP's title to "Unified > Transactional Sink API". But I found that the old link could not be opened > again. > > I would update the link[1] here. Sorry for the inconvenience. > > [1]https://cwiki.apache.org/confluence/x/KEJ4CQ > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote: > > > Hi, Steven > > > > >>I also have a clarifying question regarding the WriterStateT. Since > > >>IcebergWriter won't need to checkpoint any state, should we set it to > > *Void* > > >>type? Since getWriterStateSerializer() returns Optional, that is clear > > and > > >>we can return Optional.empty(). > > > > Yes I think you could do it. If you return Optional.empty() we would > > ignore all the state you return. > > > > Best, > > Guowei > > > > > > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote: > > > >> Hi,Steven > >> > >> Thank you for reading the FLIP so carefully. > >> 1. The frame can not know which `GlobalCommT` to retry if we use the > >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > >> 2. Of course we can let the `commit` return more detailed info but it > >> might be too complicated. > >> 3. On the other hand, I think only when restoring IcebergSink needs a > >> collection of `GlobalCommT` and giving back another collection of > >> `GlobalCommT` that are not committed. > >> > >> Best, > >> Guowei > >> > >> > >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote: > >> > >>> Guowei, > >>> > >>> Thanks a lot for updating the wiki page. It looks great. > >>> > >>> I noticed one inconsistency in the wiki with your last summary email > for > >>> GlobalCommitter interface. I think the version in the summary email is > >>> the > >>> intended one, because rollover from previous failed commits can > >>> accumulate > >>> a list. > >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki > >>> => > >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the > >>> summary email > >>> > >>> I also have a clarifying question regarding the WriterStateT. Since > >>> IcebergWriter won't need to checkpoint any state, should we set it to > >>> *Void* > >>> type? Since getWriterStateSerializer() returns Optional, that is clear > >>> and > >>> we can return Optional.empty(). > >>> > >>> Thanks, > >>> Steven > >>> > >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> > wrote: > >>> > >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any > >>> comments are > >>> > welcome. > >>> > > >>> > Best, > >>> > Guowei > >>> > > >>> > > >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek < > [hidden email]> > >>> > wrote: > >>> > > >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP > >>> > > about the names of generic parameters and the Javadoc but we can > >>> address > >>> > > them later or during implementation. > >>> > > > >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result > for > >>> > > globalCommit() but we can also do that as a later addition. > >>> > > > >>> > > So I think we're good to go to update the FLIP, do any last minute > >>> > > changes and then vote. > >>> > > > >>> > > Best, > >>> > > Aljoscha > >>> > > > >>> > > On 23.09.20 06:13, Guowei Ma wrote: > >>> > > > Hi, all > >>> > > > > >>> > > > Thank everyone very much for your ideas and suggestions. I would > >>> try to > >>> > > > summarize again the consensus :). Correct me if I am wrong or > >>> > > misunderstand > >>> > > > you. > >>> > > > > >>> > > > ## Consensus-1 > >>> > > > > >>> > > > 1. The motivation of the unified sink API is to decouple the sink > >>> > > > implementation from the different runtime execution mode. > >>> > > > 2. The initial scope of the unified sink API only covers the file > >>> > system > >>> > > > type, which supports the real transactions. The FLIP focuses more > >>> on > >>> > the > >>> > > > semantics the new sink api should support. > >>> > > > 3. We prefer the first alternative API, which could give the > >>> framework > >>> > a > >>> > > > greater opportunity to optimize. > >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which > would > >>> be > >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` > >>> method. > >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to > >>> be > >>> > more > >>> > > > focused. > >>> > > > > >>> > > > ## Consensus-2 > >>> > > > > >>> > > > 1. What should the “Unified Sink API” support/cover? It includes > >>> two > >>> > > > aspects. 1. The same sink implementation would work for both the > >>> batch > >>> > > and > >>> > > > stream execution mode. 2. In the long run we should give the sink > >>> > > developer > >>> > > > the ability of building “arbitrary” topologies. But for > Flink-1.12 > >>> we > >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg > sink. > >>> > > > 2. Because the batch execution mode does not have the normal > >>> checkpoint > >>> > > the > >>> > > > sink developer should not depend on it any more if we want a > >>> unified > >>> > > sink. > >>> > > > 3. We can benefit by providing an asynchronous `Writer` version. > >>> But > >>> > > > because the unified sink is already very complicated, we don’t > add > >>> this > >>> > > in > >>> > > > the first version. > >>> > > > > >>> > > > > >>> > > > According to these consensus I would propose the first version of > >>> the > >>> > new > >>> > > > sink api as follows. What do you think? Any comments are welcome. > >>> > > > > >>> > > > /** > >>> > > > * This interface lets the sink developer build a simple > >>> transactional > >>> > > sink > >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > >>> > > > * This sink topology includes one {@link Writer} + one {@link > >>> > > Committer} + > >>> > > > one {@link GlobalCommitter}. > >>> > > > * The {@link Writer} is responsible for producing the > >>> committable. > >>> > > > * The {@link Committer} is responsible for committing a single > >>> > > > committables. > >>> > > > * The {@link GlobalCommitter} is responsible for committing an > >>> > > aggregated > >>> > > > committable, which we called global committables. > >>> > > > * > >>> > > > * But both the {@link Committer} and the {@link > GlobalCommitter} > >>> are > >>> > > > optional. > >>> > > > */ > >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { > >>> > > > > >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext > >>> > > initContext); > >>> > > > > >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > >>> > > initContext, > >>> > > > List<WriterS> states); > >>> > > > > >>> > > > Optional<Committer<CommT>> createCommitter(); > >>> > > > > >>> > > > Optional<GlobalCommitter<CommT, GCommT>> > >>> > > createGlobalCommitter(); > >>> > > > > >>> > > > SimpleVersionedSerializer<CommT> > >>> getCommittableSerializer(); > >>> > > > > >>> > > > Optional<SimpleVersionedSerializer<GCommT>> > >>> > > > getGlobalCommittableSerializer(); > >>> > > > } > >>> > > > > >>> > > > /** > >>> > > > * The {@link GlobalCommitter} is responsible for committing an > >>> > > aggregated > >>> > > > committable, which we called global committables. > >>> > > > */ > >>> > > > interface GlobalCommitter<CommT, GCommT> { > >>> > > > > >>> > > > /** > >>> > > > * This method is called when restoring from a failover. > >>> > > > * @param globalCommittables the global committables > that > >>> are > >>> > > not > >>> > > > committed in the previous session. > >>> > > > * @return the global committables that should be > >>> committed > >>> > > again > >>> > > > in the current session. > >>> > > > */ > >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > >>> > > > globalCommittables); > >>> > > > > >>> > > > /** > >>> > > > * Compute an aggregated committable from a collection > of > >>> > > > committables. > >>> > > > * @param committables a collection of committables that > >>> are > >>> > > needed > >>> > > > to combine > >>> > > > * @return an aggregated committable > >>> > > > */ > >>> > > > GCommT combine(List<CommT> committables); > >>> > > > > >>> > > > void commit(List<GCommT> globalCommittables); > >>> > > > > >>> > > > /** > >>> > > > * There are no committables any more. > >>> > > > */ > >>> > > > void endOfInput(); > >>> > > > } > >>> > > > > >>> > > > Best, > >>> > > > Guowei > >>> > > > >>> > > > >>> > > >>> > >> > |
I should clarify my last email a little more.
For the example of commits for checkpoints 1-100 failed, the job is still up (processing records and uploading files). When commit for checkpoint 101 came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT (100 old + 1 new) so that it can commit all of them in one transaction. it is more efficient than 101 separate transactions. Maybe the GlobalCommitter#commit semantics is to give the sink all uncommitted GlobalCommT items and let sink implementation decide whether to retry one by one or in a single transaction. It could mean that we need to expand the CommitResult (e.g. a list for each result type, SUCCESS, FAILURE, RETRY) interface. We can also start with the simple enum style result for the whole list for now. If we need to break the experimental API, it is also not a big deal since we only need to update a few sink implementations. Thanks, Steven On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <[hidden email]> wrote: > > 1. The frame can not know which `GlobalCommT` to retry if we use the > > List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > > 2. Of course we can let the `commit` return more detailed info but it > might > > be too complicated. > > If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs > to be retried. E.g. we have some outage with metadata service, commits for > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't > want to commit them one by one. It is faster to commit the whole list as > one batch. > > > 3. On the other hand, I think only when restoring IcebergSink needs a > > collection of `GlobalCommT` and giving back another collection of > > `GlobalCommT` that are not committed > > That is when the job restarted due to failure or deployment. > > > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote: > >> Hi, all >> >> From the above discussion we could find that FLIP focuses on providing an >> unified transactional sink API. So I updated the FLIP's title to "Unified >> Transactional Sink API". But I found that the old link could not be opened >> again. >> >> I would update the link[1] here. Sorry for the inconvenience. >> >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ >> >> Best, >> Guowei >> >> >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote: >> >> > Hi, Steven >> > >> > >>I also have a clarifying question regarding the WriterStateT. Since >> > >>IcebergWriter won't need to checkpoint any state, should we set it to >> > *Void* >> > >>type? Since getWriterStateSerializer() returns Optional, that is clear >> > and >> > >>we can return Optional.empty(). >> > >> > Yes I think you could do it. If you return Optional.empty() we would >> > ignore all the state you return. >> > >> > Best, >> > Guowei >> > >> > >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote: >> > >> >> Hi,Steven >> >> >> >> Thank you for reading the FLIP so carefully. >> >> 1. The frame can not know which `GlobalCommT` to retry if we use the >> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. >> >> 2. Of course we can let the `commit` return more detailed info but it >> >> might be too complicated. >> >> 3. On the other hand, I think only when restoring IcebergSink needs a >> >> collection of `GlobalCommT` and giving back another collection of >> >> `GlobalCommT` that are not committed. >> >> >> >> Best, >> >> Guowei >> >> >> >> >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> >> wrote: >> >> >> >>> Guowei, >> >>> >> >>> Thanks a lot for updating the wiki page. It looks great. >> >>> >> >>> I noticed one inconsistency in the wiki with your last summary email >> for >> >>> GlobalCommitter interface. I think the version in the summary email is >> >>> the >> >>> intended one, because rollover from previous failed commits can >> >>> accumulate >> >>> a list. >> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki >> >>> => >> >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the >> >>> summary email >> >>> >> >>> I also have a clarifying question regarding the WriterStateT. Since >> >>> IcebergWriter won't need to checkpoint any state, should we set it to >> >>> *Void* >> >>> type? Since getWriterStateSerializer() returns Optional, that is clear >> >>> and >> >>> we can return Optional.empty(). >> >>> >> >>> Thanks, >> >>> Steven >> >>> >> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> >> wrote: >> >>> >> >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any >> >>> comments are >> >>> > welcome. >> >>> > >> >>> > Best, >> >>> > Guowei >> >>> > >> >>> > >> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek < >> [hidden email]> >> >>> > wrote: >> >>> > >> >>> > > Yes, that sounds good! I'll probably have some comments on the >> FLIP >> >>> > > about the names of generic parameters and the Javadoc but we can >> >>> address >> >>> > > them later or during implementation. >> >>> > > >> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result >> for >> >>> > > globalCommit() but we can also do that as a later addition. >> >>> > > >> >>> > > So I think we're good to go to update the FLIP, do any last minute >> >>> > > changes and then vote. >> >>> > > >> >>> > > Best, >> >>> > > Aljoscha >> >>> > > >> >>> > > On 23.09.20 06:13, Guowei Ma wrote: >> >>> > > > Hi, all >> >>> > > > >> >>> > > > Thank everyone very much for your ideas and suggestions. I would >> >>> try to >> >>> > > > summarize again the consensus :). Correct me if I am wrong or >> >>> > > misunderstand >> >>> > > > you. >> >>> > > > >> >>> > > > ## Consensus-1 >> >>> > > > >> >>> > > > 1. The motivation of the unified sink API is to decouple the >> sink >> >>> > > > implementation from the different runtime execution mode. >> >>> > > > 2. The initial scope of the unified sink API only covers the >> file >> >>> > system >> >>> > > > type, which supports the real transactions. The FLIP focuses >> more >> >>> on >> >>> > the >> >>> > > > semantics the new sink api should support. >> >>> > > > 3. We prefer the first alternative API, which could give the >> >>> framework >> >>> > a >> >>> > > > greater opportunity to optimize. >> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which >> would >> >>> be >> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` >> >>> method. >> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order >> to >> >>> be >> >>> > more >> >>> > > > focused. >> >>> > > > >> >>> > > > ## Consensus-2 >> >>> > > > >> >>> > > > 1. What should the “Unified Sink API” support/cover? It includes >> >>> two >> >>> > > > aspects. 1. The same sink implementation would work for both the >> >>> batch >> >>> > > and >> >>> > > > stream execution mode. 2. In the long run we should give the >> sink >> >>> > > developer >> >>> > > > the ability of building “arbitrary” topologies. But for >> Flink-1.12 >> >>> we >> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg >> sink. >> >>> > > > 2. Because the batch execution mode does not have the normal >> >>> checkpoint >> >>> > > the >> >>> > > > sink developer should not depend on it any more if we want a >> >>> unified >> >>> > > sink. >> >>> > > > 3. We can benefit by providing an asynchronous `Writer` version. >> >>> But >> >>> > > > because the unified sink is already very complicated, we don’t >> add >> >>> this >> >>> > > in >> >>> > > > the first version. >> >>> > > > >> >>> > > > >> >>> > > > According to these consensus I would propose the first version >> of >> >>> the >> >>> > new >> >>> > > > sink api as follows. What do you think? Any comments are >> welcome. >> >>> > > > >> >>> > > > /** >> >>> > > > * This interface lets the sink developer build a simple >> >>> transactional >> >>> > > sink >> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. >> >>> > > > * This sink topology includes one {@link Writer} + one {@link >> >>> > > Committer} + >> >>> > > > one {@link GlobalCommitter}. >> >>> > > > * The {@link Writer} is responsible for producing the >> >>> committable. >> >>> > > > * The {@link Committer} is responsible for committing a single >> >>> > > > committables. >> >>> > > > * The {@link GlobalCommitter} is responsible for committing an >> >>> > > aggregated >> >>> > > > committable, which we called global committables. >> >>> > > > * >> >>> > > > * But both the {@link Committer} and the {@link >> GlobalCommitter} >> >>> are >> >>> > > > optional. >> >>> > > > */ >> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { >> >>> > > > >> >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext >> >>> > > initContext); >> >>> > > > >> >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext >> >>> > > initContext, >> >>> > > > List<WriterS> states); >> >>> > > > >> >>> > > > Optional<Committer<CommT>> createCommitter(); >> >>> > > > >> >>> > > > Optional<GlobalCommitter<CommT, GCommT>> >> >>> > > createGlobalCommitter(); >> >>> > > > >> >>> > > > SimpleVersionedSerializer<CommT> >> >>> getCommittableSerializer(); >> >>> > > > >> >>> > > > Optional<SimpleVersionedSerializer<GCommT>> >> >>> > > > getGlobalCommittableSerializer(); >> >>> > > > } >> >>> > > > >> >>> > > > /** >> >>> > > > * The {@link GlobalCommitter} is responsible for committing an >> >>> > > aggregated >> >>> > > > committable, which we called global committables. >> >>> > > > */ >> >>> > > > interface GlobalCommitter<CommT, GCommT> { >> >>> > > > >> >>> > > > /** >> >>> > > > * This method is called when restoring from a >> failover. >> >>> > > > * @param globalCommittables the global committables >> that >> >>> are >> >>> > > not >> >>> > > > committed in the previous session. >> >>> > > > * @return the global committables that should be >> >>> committed >> >>> > > again >> >>> > > > in the current session. >> >>> > > > */ >> >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> >> >>> > > > globalCommittables); >> >>> > > > >> >>> > > > /** >> >>> > > > * Compute an aggregated committable from a collection >> of >> >>> > > > committables. >> >>> > > > * @param committables a collection of committables >> that >> >>> are >> >>> > > needed >> >>> > > > to combine >> >>> > > > * @return an aggregated committable >> >>> > > > */ >> >>> > > > GCommT combine(List<CommT> committables); >> >>> > > > >> >>> > > > void commit(List<GCommT> globalCommittables); >> >>> > > > >> >>> > > > /** >> >>> > > > * There are no committables any more. >> >>> > > > */ >> >>> > > > void endOfInput(); >> >>> > > > } >> >>> > > > >> >>> > > > Best, >> >>> > > > Guowei >> >>> > > >> >>> > > >> >>> > >> >>> >> >> >> > |
Hi Steven
Thank you very much for your detailed explanation. Now I got your point, I could see that there are benefits from committing a collection of `GlobalCommT` as a whole when the external metastore environment is unstable at some time. But I have two little concern about introducing committing the collection of `GlobalCommit`: 1. For Option1: CommitResult commit(List<GlobalCommitT>). This option implies that users should commit to the collection of `GlobalCommit` as a whole. But maybe not all the system could do it as a whole, for example changing some file names could not do it. If it is the case I think maybe some guy would always ask the same question as I asked in the previous mail. 2. For Option2: List<CommitResult> commit(List<GlobalCommitT>). This option is more clear than the first one. But IMHO this option has only benefits when the external metastore is unstable and we want to retry many times and not fail the job. Maybe we should not rety so many times and end up with a lot of the uncommitted `GlobalCommitT`. If this is the case maybe we should make the api more clear/simple for the normal scenario. In addition there is only a globalcommit instance so I think the external system could bear the pressure. So personally I would like to say we might keep the API simpler at the beginning in 1.12 What do you think? Best, Guowei On Fri, Sep 25, 2020 at 9:30 PM Steven Wu <[hidden email]> wrote: > I should clarify my last email a little more. > > For the example of commits for checkpoints 1-100 failed, the job is still > up (processing records and uploading files). When commit for checkpoint 101 > came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT > (100 old + 1 new) so that it can commit all of them in one transaction. it > is more efficient than 101 separate transactions. > > Maybe the GlobalCommitter#commit semantics is to give the sink all > uncommitted GlobalCommT items and let sink implementation decide whether to > retry one by one or in a single transaction. It could mean that we need to > expand the CommitResult (e.g. a list for each result type, SUCCESS, > FAILURE, RETRY) interface. We can also start with the simple enum style > result for the whole list for now. If we need to break the experimental > API, it is also not a big deal since we only need to update a few sink > implementations. > > Thanks, > Steven > > On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <[hidden email]> wrote: > > > > 1. The frame can not know which `GlobalCommT` to retry if we use the > > > List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > > > 2. Of course we can let the `commit` return more detailed info but it > > might > > > be too complicated. > > > > If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs > > to be retried. E.g. we have some outage with metadata service, commits > for > > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we > don't > > want to commit them one by one. It is faster to commit the whole list as > > one batch. > > > > > 3. On the other hand, I think only when restoring IcebergSink needs a > > > collection of `GlobalCommT` and giving back another collection of > > > `GlobalCommT` that are not committed > > > > That is when the job restarted due to failure or deployment. > > > > > > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote: > > > >> Hi, all > >> > >> From the above discussion we could find that FLIP focuses on providing > an > >> unified transactional sink API. So I updated the FLIP's title to > "Unified > >> Transactional Sink API". But I found that the old link could not be > opened > >> again. > >> > >> I would update the link[1] here. Sorry for the inconvenience. > >> > >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ > >> > >> Best, > >> Guowei > >> > >> > >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote: > >> > >> > Hi, Steven > >> > > >> > >>I also have a clarifying question regarding the WriterStateT. Since > >> > >>IcebergWriter won't need to checkpoint any state, should we set it > to > >> > *Void* > >> > >>type? Since getWriterStateSerializer() returns Optional, that is > clear > >> > and > >> > >>we can return Optional.empty(). > >> > > >> > Yes I think you could do it. If you return Optional.empty() we would > >> > ignore all the state you return. > >> > > >> > Best, > >> > Guowei > >> > > >> > > >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> > wrote: > >> > > >> >> Hi,Steven > >> >> > >> >> Thank you for reading the FLIP so carefully. > >> >> 1. The frame can not know which `GlobalCommT` to retry if we use the > >> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > >> >> 2. Of course we can let the `commit` return more detailed info but it > >> >> might be too complicated. > >> >> 3. On the other hand, I think only when restoring IcebergSink needs a > >> >> collection of `GlobalCommT` and giving back another collection of > >> >> `GlobalCommT` that are not committed. > >> >> > >> >> Best, > >> >> Guowei > >> >> > >> >> > >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> > >> wrote: > >> >> > >> >>> Guowei, > >> >>> > >> >>> Thanks a lot for updating the wiki page. It looks great. > >> >>> > >> >>> I noticed one inconsistency in the wiki with your last summary email > >> for > >> >>> GlobalCommitter interface. I think the version in the summary email > is > >> >>> the > >> >>> intended one, because rollover from previous failed commits can > >> >>> accumulate > >> >>> a list. > >> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki > >> >>> => > >> >>> CommitResult commit(List<GlobalCommT> globalCommittable); // in the > >> >>> summary email > >> >>> > >> >>> I also have a clarifying question regarding the WriterStateT. Since > >> >>> IcebergWriter won't need to checkpoint any state, should we set it > to > >> >>> *Void* > >> >>> type? Since getWriterStateSerializer() returns Optional, that is > clear > >> >>> and > >> >>> we can return Optional.empty(). > >> >>> > >> >>> Thanks, > >> >>> Steven > >> >>> > >> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> > >> wrote: > >> >>> > >> >>> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any > >> >>> comments are > >> >>> > welcome. > >> >>> > > >> >>> > Best, > >> >>> > Guowei > >> >>> > > >> >>> > > >> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek < > >> [hidden email]> > >> >>> > wrote: > >> >>> > > >> >>> > > Yes, that sounds good! I'll probably have some comments on the > >> FLIP > >> >>> > > about the names of generic parameters and the Javadoc but we can > >> >>> address > >> >>> > > them later or during implementation. > >> >>> > > > >> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result > >> for > >> >>> > > globalCommit() but we can also do that as a later addition. > >> >>> > > > >> >>> > > So I think we're good to go to update the FLIP, do any last > minute > >> >>> > > changes and then vote. > >> >>> > > > >> >>> > > Best, > >> >>> > > Aljoscha > >> >>> > > > >> >>> > > On 23.09.20 06:13, Guowei Ma wrote: > >> >>> > > > Hi, all > >> >>> > > > > >> >>> > > > Thank everyone very much for your ideas and suggestions. I > would > >> >>> try to > >> >>> > > > summarize again the consensus :). Correct me if I am wrong or > >> >>> > > misunderstand > >> >>> > > > you. > >> >>> > > > > >> >>> > > > ## Consensus-1 > >> >>> > > > > >> >>> > > > 1. The motivation of the unified sink API is to decouple the > >> sink > >> >>> > > > implementation from the different runtime execution mode. > >> >>> > > > 2. The initial scope of the unified sink API only covers the > >> file > >> >>> > system > >> >>> > > > type, which supports the real transactions. The FLIP focuses > >> more > >> >>> on > >> >>> > the > >> >>> > > > semantics the new sink api should support. > >> >>> > > > 3. We prefer the first alternative API, which could give the > >> >>> framework > >> >>> > a > >> >>> > > > greater opportunity to optimize. > >> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which > >> would > >> >>> be > >> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the > `Flush` > >> >>> method. > >> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order > >> to > >> >>> be > >> >>> > more > >> >>> > > > focused. > >> >>> > > > > >> >>> > > > ## Consensus-2 > >> >>> > > > > >> >>> > > > 1. What should the “Unified Sink API” support/cover? It > includes > >> >>> two > >> >>> > > > aspects. 1. The same sink implementation would work for both > the > >> >>> batch > >> >>> > > and > >> >>> > > > stream execution mode. 2. In the long run we should give the > >> sink > >> >>> > > developer > >> >>> > > > the ability of building “arbitrary” topologies. But for > >> Flink-1.12 > >> >>> we > >> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg > >> sink. > >> >>> > > > 2. Because the batch execution mode does not have the normal > >> >>> checkpoint > >> >>> > > the > >> >>> > > > sink developer should not depend on it any more if we want a > >> >>> unified > >> >>> > > sink. > >> >>> > > > 3. We can benefit by providing an asynchronous `Writer` > version. > >> >>> But > >> >>> > > > because the unified sink is already very complicated, we don’t > >> add > >> >>> this > >> >>> > > in > >> >>> > > > the first version. > >> >>> > > > > >> >>> > > > > >> >>> > > > According to these consensus I would propose the first version > >> of > >> >>> the > >> >>> > new > >> >>> > > > sink api as follows. What do you think? Any comments are > >> welcome. > >> >>> > > > > >> >>> > > > /** > >> >>> > > > * This interface lets the sink developer build a simple > >> >>> transactional > >> >>> > > sink > >> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > >> >>> > > > * This sink topology includes one {@link Writer} + one > {@link > >> >>> > > Committer} + > >> >>> > > > one {@link GlobalCommitter}. > >> >>> > > > * The {@link Writer} is responsible for producing the > >> >>> committable. > >> >>> > > > * The {@link Committer} is responsible for committing a > single > >> >>> > > > committables. > >> >>> > > > * The {@link GlobalCommitter} is responsible for committing > an > >> >>> > > aggregated > >> >>> > > > committable, which we called global committables. > >> >>> > > > * > >> >>> > > > * But both the {@link Committer} and the {@link > >> GlobalCommitter} > >> >>> are > >> >>> > > > optional. > >> >>> > > > */ > >> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> { > >> >>> > > > > >> >>> > > > Writer<IN, CommT, WriterS> createWriter(InitContext > >> >>> > > initContext); > >> >>> > > > > >> >>> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > >> >>> > > initContext, > >> >>> > > > List<WriterS> states); > >> >>> > > > > >> >>> > > > Optional<Committer<CommT>> createCommitter(); > >> >>> > > > > >> >>> > > > Optional<GlobalCommitter<CommT, GCommT>> > >> >>> > > createGlobalCommitter(); > >> >>> > > > > >> >>> > > > SimpleVersionedSerializer<CommT> > >> >>> getCommittableSerializer(); > >> >>> > > > > >> >>> > > > Optional<SimpleVersionedSerializer<GCommT>> > >> >>> > > > getGlobalCommittableSerializer(); > >> >>> > > > } > >> >>> > > > > >> >>> > > > /** > >> >>> > > > * The {@link GlobalCommitter} is responsible for committing > an > >> >>> > > aggregated > >> >>> > > > committable, which we called global committables. > >> >>> > > > */ > >> >>> > > > interface GlobalCommitter<CommT, GCommT> { > >> >>> > > > > >> >>> > > > /** > >> >>> > > > * This method is called when restoring from a > >> failover. > >> >>> > > > * @param globalCommittables the global committables > >> that > >> >>> are > >> >>> > > not > >> >>> > > > committed in the previous session. > >> >>> > > > * @return the global committables that should be > >> >>> committed > >> >>> > > again > >> >>> > > > in the current session. > >> >>> > > > */ > >> >>> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > >> >>> > > > globalCommittables); > >> >>> > > > > >> >>> > > > /** > >> >>> > > > * Compute an aggregated committable from a > collection > >> of > >> >>> > > > committables. > >> >>> > > > * @param committables a collection of committables > >> that > >> >>> are > >> >>> > > needed > >> >>> > > > to combine > >> >>> > > > * @return an aggregated committable > >> >>> > > > */ > >> >>> > > > GCommT combine(List<CommT> committables); > >> >>> > > > > >> >>> > > > void commit(List<GCommT> globalCommittables); > >> >>> > > > > >> >>> > > > /** > >> >>> > > > * There are no committables any more. > >> >>> > > > */ > >> >>> > > > void endOfInput(); > >> >>> > > > } > >> >>> > > > > >> >>> > > > Best, > >> >>> > > > Guowei > >> >>> > > > >> >>> > > > >> >>> > > >> >>> > >> >> > >> > > > |
Free forum by Nabble | Edit this page |