[DISCUSS] FLIP-143: Unified Sink API

classic Classic list List threaded Threaded
61 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
On 22.09.20 06:06, Steven Wu wrote:

> In addition, it is undesirable to do the committed-or-not check in the
> commit method, which happens for each checkpoint cycle. CommitResult
> already indicates SUCCESS or not. when framework calls commit with a list
> of GlobalCommittableT, it should be certain they are uncommitted. The only
> time we aren't sure is when a list of  GlobalCommittableT is restored from
> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do such a
> check and filter out the ones that were already committed. Retained ones
> will be committed in the next checkpoint cycle. Since framework takes care
> of the checkpoint and restore, we need some hook for the sink to add the
> custom logic on the restored list.

I think we don't need the `recoverGlobalCommittables()` hook. The sink
implementation would have to do the filtering once, so it can either do
it in the recover hook or it could do it in the next `commit()` call.
Both of these would mean we only have to do one pass through the list
and connect to Iceberg. Doing the check in `commit()` would mean the
interface of GlobalCommittable is simpler and to me it seems natural
that we do the check in the commit() method to ensure that commits are
idempotent.

What do you think?

Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
Ah sorry, I think I now see what you mean. I think it's ok to add a
`List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)`
method.


On 22.09.20 09:42, Aljoscha Krettek wrote:

> On 22.09.20 06:06, Steven Wu wrote:
>> In addition, it is undesirable to do the committed-or-not check in the
>> commit method, which happens for each checkpoint cycle. CommitResult
>> already indicates SUCCESS or not. when framework calls commit with a list
>> of GlobalCommittableT, it should be certain they are uncommitted. The
>> only
>> time we aren't sure is when a list of  GlobalCommittableT is restored
>> from
>> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do
>> such a
>> check and filter out the ones that were already committed. Retained ones
>> will be committed in the next checkpoint cycle. Since framework takes
>> care
>> of the checkpoint and restore, we need some hook for the sink to add the
>> custom logic on the restored list.
>
> I think we don't need the `recoverGlobalCommittables()` hook. The sink
> implementation would have to do the filtering once, so it can either do
> it in the recover hook or it could do it in the next `commit()` call.
> Both of these would mean we only have to do one pass through the list
> and connect to Iceberg. Doing the check in `commit()` would mean the
> interface of GlobalCommittable is simpler and to me it seems natural
> that we do the check in the commit() method to ensure that commits are
> idempotent.
>
> What do you think?
>
> Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Thanks @aljoscha summary. I agree we should postpone the discussion of the
sink topology first and focus on the normal file sink and IcebergSink in
the Flink 1.12.

I have three little questions:

1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
that we could make `write success file` be available in both batch and
stream execution mode.
2.  If we choose to let the two types of committer appear at the same time
in the API we have to figure out how to express the relation between the
two committers. I think the Sink API may look like the following: What do
you think?
Sink<T, CommT, CommR, ShareStateT...> {
        Writer<T, CommT, ShareStateT.....> createWriter();
        Optional<Committer<CommT>> createCommitter();
        Optional<GlobalComiitter<CommT, GlobalCommT>>
createGlobalCommitter();
}
3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I
think the sink developer could rety himself. Why need the framework to do
the retry?

Best,
Guowei


On Tue, Sep 22, 2020 at 4:47 PM Aljoscha Krettek <[hidden email]>
wrote:

> Ah sorry, I think I now see what you mean. I think it's ok to add a
> `List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)`
> method.
>
>
> On 22.09.20 09:42, Aljoscha Krettek wrote:
> > On 22.09.20 06:06, Steven Wu wrote:
> >> In addition, it is undesirable to do the committed-or-not check in the
> >> commit method, which happens for each checkpoint cycle. CommitResult
> >> already indicates SUCCESS or not. when framework calls commit with a
> list
> >> of GlobalCommittableT, it should be certain they are uncommitted. The
> >> only
> >> time we aren't sure is when a list of  GlobalCommittableT is restored
> >> from
> >> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do
> >> such a
> >> check and filter out the ones that were already committed. Retained ones
> >> will be committed in the next checkpoint cycle. Since framework takes
> >> care
> >> of the checkpoint and restore, we need some hook for the sink to add the
> >> custom logic on the restored list.
> >
> > I think we don't need the `recoverGlobalCommittables()` hook. The sink
> > implementation would have to do the filtering once, so it can either do
> > it in the recover hook or it could do it in the next `commit()` call.
> > Both of these would mean we only have to do one pass through the list
> > and connect to Iceberg. Doing the check in `commit()` would mean the
> > interface of GlobalCommittable is simpler and to me it seems natural
> > that we do the check in the commit() method to ensure that commits are
> > idempotent.
> >
> > What do you think?
> >
> > Aljoscha
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
On 22.09.20 11:10, Guowei Ma wrote:
> 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
> that we could make `write success file` be available in both batch and
> stream execution mode.

We could, yes. I'm now hesitant because we're adding more things but I
think it should be fine.

> 2.  If we choose to let the two types of committer appear at the same time
> in the API we have to figure out how to express the relation between the
> two committers. I think the Sink API may look like the following: What do
> you think?
> Sink<T, CommT, CommR, ShareStateT...> {
>          Writer<T, CommT, ShareStateT.....> createWriter();
>          Optional<Committer<CommT>> createCommitter();
>          Optional<GlobalComiitter<CommT, GlobalCommT>>
> createGlobalCommitter();
> }

Yes, I think this is what we should do. Though I think that we should
initially not support shared state. The FileSink only uses this to
create unique file names and I think we can do without it. If we see
that we need it later we can add it but I would like to keep things
minimal initially. It's always easy to add things later but it's hard to
take things away once you added them.

> 3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I
> think the sink developer could rety himself. Why need the framework to do
> the retry?

It's not a silly question at all! I think we need the retry to support
such problems as Steven mentioned. If a commit fails a RETRY tells the
framework that it should keep the commits in state and retry them on the
next checkpoint. When the committer returns FAILURE we should just fail
the job. It's to support temporary outages of the external metastore.

I'm open to leaving it out of the initial version for the same reasons I
mentioned above but I think it could be valuable.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
>> I believe that we could support such an async sink writer
>> very easily in the future. What do you think?

>> How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?

Hi @piotr <[hidden email]>

Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
But for the initial version I hope we could make the sink api sync because
there is already a lot of stuff that has to finish. :--)

What do you think?

Best,
Guowei


On Tue, Sep 22, 2020 at 5:25 PM Aljoscha Krettek <[hidden email]>
wrote:

> On 22.09.20 11:10, Guowei Ma wrote:
> > 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
> > that we could make `write success file` be available in both batch and
> > stream execution mode.
>
> We could, yes. I'm now hesitant because we're adding more things but I
> think it should be fine.
>
> > 2.  If we choose to let the two types of committer appear at the same
> time
> > in the API we have to figure out how to express the relation between the
> > two committers. I think the Sink API may look like the following: What do
> > you think?
> > Sink<T, CommT, CommR, ShareStateT...> {
> >          Writer<T, CommT, ShareStateT.....> createWriter();
> >          Optional<Committer<CommT>> createCommitter();
> >          Optional<GlobalComiitter<CommT, GlobalCommT>>
> > createGlobalCommitter();
> > }
>
> Yes, I think this is what we should do. Though I think that we should
> initially not support shared state. The FileSink only uses this to
> create unique file names and I think we can do without it. If we see
> that we need it later we can add it but I would like to keep things
> minimal initially. It's always easy to add things later but it's hard to
> take things away once you added them.
>
> > 3. Maybe a silly question: Why do we need `commit` return
> `CommitResult`? I
> > think the sink developer could rety himself. Why need the framework to do
> > the retry?
>
> It's not a silly question at all! I think we need the retry to support
> such problems as Steven mentioned. If a commit fails a RETRY tells the
> framework that it should keep the commits in state and retry them on the
> next checkpoint. When the committer returns FAILURE we should just fail
> the job. It's to support temporary outages of the external metastore.
>
> I'm open to leaving it out of the initial version for the same reasons I
> mentioned above but I think it could be valuable.
>
> Best,
> Aljoscha
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
On 22.09.20 13:26, Guowei Ma wrote:
> Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
> But for the initial version I hope we could make the sink api sync because
> there is already a lot of stuff that has to finish. :--)

I agree, for the first version we should stick to a simpler synchronous
interface.

Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
It is fine to leave the CommitResult/RETRY outside the scope of framework.
Then the framework might need to provide some hooks in the
checkpoint/restore logic. because the commit happened in the post
checkpoint completion step, sink needs to update the internal state when
the commit is successful so that the next checkpoint won't include the
committed GlobalCommT.

Maybe GlobalCommitter can have an API like this?
> List<GlobalCommT> snapshotState();

But then we still need the recover API if we don't let sink directly manage
the state.
> List<GlobalCommT> recoverCommittables(List<GlobalCommT>)

Thanks,
Steven

On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]>
wrote:

> On 22.09.20 13:26, Guowei Ma wrote:
> > Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
> > But for the initial version I hope we could make the sink api sync
> because
> > there is already a lot of stuff that has to finish. :--)
>
> I agree, for the first version we should stick to a simpler synchronous
> interface.
>
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
Previous APIs discussed have been trying to do more in the framework. If we
take a different approach to a lighter framework, these sets of
minimal APIs are probably good enough. Sink can handle the bookkeeping,
merge, retry logics.

/**
 * CommT is the DataFile in Iceberg
 * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg
*/
interface GlobalCommitter<CommT, GlobalCommT> {

  void collect(CommT);

  void commit();

  List<GlobalCommT> snapshotState();

  // this is just a callback to sink so that it can perform filter and
retain the uncommitted GlobalCommT in the internal bookkeeping
  void recoveredCommittables(List<GlobalCommT>) ;
}

The most important need from the framework is to run GlobalCommitter in the
jobmanager. It involves the topology creation, checkpoint handling,
serializing the executions of commit() calls etc.

Thanks,
Steven

On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote:

> It is fine to leave the CommitResult/RETRY outside the scope of framework.
> Then the framework might need to provide some hooks in the
> checkpoint/restore logic. because the commit happened in the post
> checkpoint completion step, sink needs to update the internal state when
> the commit is successful so that the next checkpoint won't include the
> committed GlobalCommT.
>
> Maybe GlobalCommitter can have an API like this?
> > List<GlobalCommT> snapshotState();
>
> But then we still need the recover API if we don't let sink directly
> manage the state.
> > List<GlobalCommT> recoverCommittables(List<GlobalCommT>)
>
> Thanks,
> Steven
>
> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> On 22.09.20 13:26, Guowei Ma wrote:
>> > Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
>> > But for the initial version I hope we could make the sink api sync
>> because
>> > there is already a lot of stuff that has to finish. :--)
>>
>> I agree, for the first version we should stick to a simpler synchronous
>> interface.
>>
>> Aljoscha
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
I think we should go with something like

List<GlobalCommitT> filterRecoveredCommittables(List<>)

to keep things simple. This should also be easy to do from the framework
side and then the sink doesn't need to do any custom state handling.

Best,
Aljoscha

On 22.09.20 16:03, Steven Wu wrote:

> Previous APIs discussed have been trying to do more in the framework. If we
> take a different approach to a lighter framework, these sets of
> minimal APIs are probably good enough. Sink can handle the bookkeeping,
> merge, retry logics.
>
> /**
>   * CommT is the DataFile in Iceberg
>   * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg
> */
> interface GlobalCommitter<CommT, GlobalCommT> {
>
>    void collect(CommT);
>
>    void commit();
>
>    List<GlobalCommT> snapshotState();
>
>    // this is just a callback to sink so that it can perform filter and
> retain the uncommitted GlobalCommT in the internal bookkeeping
>    void recoveredCommittables(List<GlobalCommT>) ;
> }
>
> The most important need from the framework is to run GlobalCommitter in the
> jobmanager. It involves the topology creation, checkpoint handling,
> serializing the executions of commit() calls etc.
>
> Thanks,
> Steven
>
> On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote:
>
>> It is fine to leave the CommitResult/RETRY outside the scope of framework.
>> Then the framework might need to provide some hooks in the
>> checkpoint/restore logic. because the commit happened in the post
>> checkpoint completion step, sink needs to update the internal state when
>> the commit is successful so that the next checkpoint won't include the
>> committed GlobalCommT.
>>
>> Maybe GlobalCommitter can have an API like this?
>>> List<GlobalCommT> snapshotState();
>>
>> But then we still need the recover API if we don't let sink directly
>> manage the state.
>>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>)
>>
>> Thanks,
>> Steven
>>
>> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> On 22.09.20 13:26, Guowei Ma wrote:
>>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
>>>> But for the initial version I hope we could make the sink api sync
>>> because
>>>> there is already a lot of stuff that has to finish. :--)
>>>
>>> I agree, for the first version we should stick to a simpler synchronous
>>> interface.
>>>
>>> Aljoscha
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
>> I think we should go with something like

>> List<GlobalCommitT> filterRecoveredCommittables(List<>)

>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.

I second Aljoscha's  proposal. For the first version there is already much
stuff to do.
For now I think it would be satisfied with IceBerg Sink.

Best,
Guowei


On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <[hidden email]>
wrote:

> I think we should go with something like
>
> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>
> to keep things simple. This should also be easy to do from the framework
> side and then the sink doesn't need to do any custom state handling.
>
> Best,
> Aljoscha
>
> On 22.09.20 16:03, Steven Wu wrote:
> > Previous APIs discussed have been trying to do more in the framework. If
> we
> > take a different approach to a lighter framework, these sets of
> > minimal APIs are probably good enough. Sink can handle the bookkeeping,
> > merge, retry logics.
> >
> > /**
> >   * CommT is the DataFile in Iceberg
> >   * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg
> > */
> > interface GlobalCommitter<CommT, GlobalCommT> {
> >
> >    void collect(CommT);
> >
> >    void commit();
> >
> >    List<GlobalCommT> snapshotState();
> >
> >    // this is just a callback to sink so that it can perform filter and
> > retain the uncommitted GlobalCommT in the internal bookkeeping
> >    void recoveredCommittables(List<GlobalCommT>) ;
> > }
> >
> > The most important need from the framework is to run GlobalCommitter in
> the
> > jobmanager. It involves the topology creation, checkpoint handling,
> > serializing the executions of commit() calls etc.
> >
> > Thanks,
> > Steven
> >
> > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote:
> >
> >> It is fine to leave the CommitResult/RETRY outside the scope of
> framework.
> >> Then the framework might need to provide some hooks in the
> >> checkpoint/restore logic. because the commit happened in the post
> >> checkpoint completion step, sink needs to update the internal state when
> >> the commit is successful so that the next checkpoint won't include the
> >> committed GlobalCommT.
> >>
> >> Maybe GlobalCommitter can have an API like this?
> >>> List<GlobalCommT> snapshotState();
> >>
> >> But then we still need the recover API if we don't let sink directly
> >> manage the state.
> >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>)
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >>> On 22.09.20 13:26, Guowei Ma wrote:
> >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is
> not.
> >>>> But for the initial version I hope we could make the sink api sync
> >>> because
> >>>> there is already a lot of stuff that has to finish. :--)
> >>>
> >>> I agree, for the first version we should stick to a simpler synchronous
> >>> interface.
> >>>
> >>> Aljoscha
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, all

Thank everyone very much for your ideas and suggestions. I would try to
summarize again the consensus :). Correct me if I am wrong or misunderstand
you.

## Consensus-1

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

## Consensus-2

1. What should the “Unified Sink API” support/cover? It includes two
aspects. 1. The same sink implementation would work for both the batch and
stream execution mode. 2. In the long run we should give the sink developer
the ability of building “arbitrary” topologies. But for Flink-1.12 we
should be more focused on only satisfying the S3/HDFS/Iceberg sink.
2. Because the batch execution mode does not have the normal checkpoint the
sink developer should not depend on it any more if we want a unified sink.
3. We can benefit by providing an asynchronous `Writer` version. But
because the unified sink is already very complicated, we don’t add this in
the first version.


According to these consensus I would propose the first version of the new
sink api as follows. What do you think? Any comments are welcome.

/**
 * This interface lets the sink developer build a simple transactional sink
topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} +
one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single
committables.
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 *
 * But both the {@link Committer} and the {@link GlobalCommitter} are
optional.
 */
interface TSink<IN, CommT, GCommT, WriterS> {

        Writer<IN, CommT, WriterS> createWriter(InitContext initContext);

        Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext,
List<WriterS> states);

        Optional<Committer<CommT>> createCommitter();

        Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();

        SimpleVersionedSerializer<CommT> getCommittableSerializer();

        Optional<SimpleVersionedSerializer<GCommT>>
getGlobalCommittableSerializer();
}

/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 */
interface GlobalCommitter<CommT, GCommT> {

        /**
         * This method is called when restoring from a failover.
         * @param globalCommittables the global committables that are not
committed in the previous session.
         * @return the global committables that should be committed again
in the current session.
         */
        List<GCommT> filterRecoveredCommittables(List<GCommT>
globalCommittables);

        /**
         * Compute an aggregated committable from a collection of
committables.
         * @param committables a collection of committables that are needed
to combine
         * @return an aggregated committable
         */
        GCommT combine(List<CommT> committables);

        void commit(List<GCommT> globalCommittables);

        /**
         * There are no committables any more.
         */
        void endOfInput();
}

Best,
Guowei


On Wed, Sep 23, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote:

> >> I think we should go with something like
>
> >> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>
> >> to keep things simple. This should also be easy to do from the framework
> >> side and then the sink doesn't need to do any custom state handling.
>
> I second Aljoscha's  proposal. For the first version there is already much
> stuff to do.
> For now I think it would be satisfied with IceBerg Sink.
>
> Best,
> Guowei
>
>
> On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> I think we should go with something like
>>
>> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>>
>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.
>>
>> Best,
>> Aljoscha
>>
>> On 22.09.20 16:03, Steven Wu wrote:
>> > Previous APIs discussed have been trying to do more in the framework.
>> If we
>> > take a different approach to a lighter framework, these sets of
>> > minimal APIs are probably good enough. Sink can handle the bookkeeping,
>> > merge, retry logics.
>> >
>> > /**
>> >   * CommT is the DataFile in Iceberg
>> >   * GlobalCommT is the checkpoint data type, like ManifestFile in
>> Iceberg
>> > */
>> > interface GlobalCommitter<CommT, GlobalCommT> {
>> >
>> >    void collect(CommT);
>> >
>> >    void commit();
>> >
>> >    List<GlobalCommT> snapshotState();
>> >
>> >    // this is just a callback to sink so that it can perform filter and
>> > retain the uncommitted GlobalCommT in the internal bookkeeping
>> >    void recoveredCommittables(List<GlobalCommT>) ;
>> > }
>> >
>> > The most important need from the framework is to run GlobalCommitter in
>> the
>> > jobmanager. It involves the topology creation, checkpoint handling,
>> > serializing the executions of commit() calls etc.
>> >
>> > Thanks,
>> > Steven
>> >
>> > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <[hidden email]> wrote:
>> >
>> >> It is fine to leave the CommitResult/RETRY outside the scope of
>> framework.
>> >> Then the framework might need to provide some hooks in the
>> >> checkpoint/restore logic. because the commit happened in the post
>> >> checkpoint completion step, sink needs to update the internal state
>> when
>> >> the commit is successful so that the next checkpoint won't include the
>> >> committed GlobalCommT.
>> >>
>> >> Maybe GlobalCommitter can have an API like this?
>> >>> List<GlobalCommT> snapshotState();
>> >>
>> >> But then we still need the recover API if we don't let sink directly
>> >> manage the state.
>> >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>)
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>
>> >>> On 22.09.20 13:26, Guowei Ma wrote:
>> >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is
>> not.
>> >>>> But for the initial version I hope we could make the sink api sync
>> >>> because
>> >>>> there is already a lot of stuff that has to finish. :--)
>> >>>
>> >>> I agree, for the first version we should stick to a simpler
>> synchronous
>> >>> interface.
>> >>>
>> >>> Aljoscha
>> >>>
>> >>
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
Yes, that sounds good! I'll probably have some comments on the FLIP
about the names of generic parameters and the Javadoc but we can address
them later or during implementation.

I also think that we probably need the FAIL,RETRY,SUCCESS result for
globalCommit() but we can also do that as a later addition.

So I think we're good to go to update the FLIP, do any last minute
changes and then vote.

Best,
Aljoscha

On 23.09.20 06:13, Guowei Ma wrote:

> Hi, all
>
> Thank everyone very much for your ideas and suggestions. I would try to
> summarize again the consensus :). Correct me if I am wrong or misunderstand
> you.
>
> ## Consensus-1
>
> 1. The motivation of the unified sink API is to decouple the sink
> implementation from the different runtime execution mode.
> 2. The initial scope of the unified sink API only covers the file system
> type, which supports the real transactions. The FLIP focuses more on the
> semantics the new sink api should support.
> 3. We prefer the first alternative API, which could give the framework a
> greater opportunity to optimize.
> 4. The `Writer` needs to add a method `prepareCommit`, which would be
> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> 5. The FLIP could move the `Snapshot & Drain` section in order to be more
> focused.
>
> ## Consensus-2
>
> 1. What should the “Unified Sink API” support/cover? It includes two
> aspects. 1. The same sink implementation would work for both the batch and
> stream execution mode. 2. In the long run we should give the sink developer
> the ability of building “arbitrary” topologies. But for Flink-1.12 we
> should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> 2. Because the batch execution mode does not have the normal checkpoint the
> sink developer should not depend on it any more if we want a unified sink.
> 3. We can benefit by providing an asynchronous `Writer` version. But
> because the unified sink is already very complicated, we don’t add this in
> the first version.
>
>
> According to these consensus I would propose the first version of the new
> sink api as follows. What do you think? Any comments are welcome.
>
> /**
>   * This interface lets the sink developer build a simple transactional sink
> topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>   * This sink topology includes one {@link Writer} + one {@link Committer} +
> one {@link GlobalCommitter}.
>   * The {@link Writer} is responsible for producing the committable.
>   * The {@link Committer} is responsible for committing a single
> committables.
>   * The {@link GlobalCommitter} is responsible for committing an aggregated
> committable, which we called global committables.
>   *
>   * But both the {@link Committer} and the {@link GlobalCommitter} are
> optional.
>   */
> interface TSink<IN, CommT, GCommT, WriterS> {
>
>          Writer<IN, CommT, WriterS> createWriter(InitContext initContext);
>
>          Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext,
> List<WriterS> states);
>
>          Optional<Committer<CommT>> createCommitter();
>
>          Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();
>
>          SimpleVersionedSerializer<CommT> getCommittableSerializer();
>
>          Optional<SimpleVersionedSerializer<GCommT>>
> getGlobalCommittableSerializer();
> }
>
> /**
>   * The {@link GlobalCommitter} is responsible for committing an aggregated
> committable, which we called global committables.
>   */
> interface GlobalCommitter<CommT, GCommT> {
>
>          /**
>           * This method is called when restoring from a failover.
>           * @param globalCommittables the global committables that are not
> committed in the previous session.
>           * @return the global committables that should be committed again
> in the current session.
>           */
>          List<GCommT> filterRecoveredCommittables(List<GCommT>
> globalCommittables);
>
>          /**
>           * Compute an aggregated committable from a collection of
> committables.
>           * @param committables a collection of committables that are needed
> to combine
>           * @return an aggregated committable
>           */
>          GCommT combine(List<CommT> committables);
>
>          void commit(List<GCommT> globalCommittables);
>
>          /**
>           * There are no committables any more.
>           */
>          void endOfInput();
> }
>
> Best,
> Guowei

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments are
welcome.

Best,
Guowei


On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]>
wrote:

> Yes, that sounds good! I'll probably have some comments on the FLIP
> about the names of generic parameters and the Javadoc but we can address
> them later or during implementation.
>
> I also think that we probably need the FAIL,RETRY,SUCCESS result for
> globalCommit() but we can also do that as a later addition.
>
> So I think we're good to go to update the FLIP, do any last minute
> changes and then vote.
>
> Best,
> Aljoscha
>
> On 23.09.20 06:13, Guowei Ma wrote:
> > Hi, all
> >
> > Thank everyone very much for your ideas and suggestions. I would try to
> > summarize again the consensus :). Correct me if I am wrong or
> misunderstand
> > you.
> >
> > ## Consensus-1
> >
> > 1. The motivation of the unified sink API is to decouple the sink
> > implementation from the different runtime execution mode.
> > 2. The initial scope of the unified sink API only covers the file system
> > type, which supports the real transactions. The FLIP focuses more on the
> > semantics the new sink api should support.
> > 3. We prefer the first alternative API, which could give the framework a
> > greater opportunity to optimize.
> > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > 5. The FLIP could move the `Snapshot & Drain` section in order to be more
> > focused.
> >
> > ## Consensus-2
> >
> > 1. What should the “Unified Sink API” support/cover? It includes two
> > aspects. 1. The same sink implementation would work for both the batch
> and
> > stream execution mode. 2. In the long run we should give the sink
> developer
> > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > 2. Because the batch execution mode does not have the normal checkpoint
> the
> > sink developer should not depend on it any more if we want a unified
> sink.
> > 3. We can benefit by providing an asynchronous `Writer` version. But
> > because the unified sink is already very complicated, we don’t add this
> in
> > the first version.
> >
> >
> > According to these consensus I would propose the first version of the new
> > sink api as follows. What do you think? Any comments are welcome.
> >
> > /**
> >   * This interface lets the sink developer build a simple transactional
> sink
> > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >   * This sink topology includes one {@link Writer} + one {@link
> Committer} +
> > one {@link GlobalCommitter}.
> >   * The {@link Writer} is responsible for producing the committable.
> >   * The {@link Committer} is responsible for committing a single
> > committables.
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   *
> >   * But both the {@link Committer} and the {@link GlobalCommitter} are
> > optional.
> >   */
> > interface TSink<IN, CommT, GCommT, WriterS> {
> >
> >          Writer<IN, CommT, WriterS> createWriter(InitContext
> initContext);
> >
> >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> initContext,
> > List<WriterS> states);
> >
> >          Optional<Committer<CommT>> createCommitter();
> >
> >          Optional<GlobalCommitter<CommT, GCommT>>
> createGlobalCommitter();
> >
> >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> >
> >          Optional<SimpleVersionedSerializer<GCommT>>
> > getGlobalCommittableSerializer();
> > }
> >
> > /**
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   */
> > interface GlobalCommitter<CommT, GCommT> {
> >
> >          /**
> >           * This method is called when restoring from a failover.
> >           * @param globalCommittables the global committables that are
> not
> > committed in the previous session.
> >           * @return the global committables that should be committed
> again
> > in the current session.
> >           */
> >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > globalCommittables);
> >
> >          /**
> >           * Compute an aggregated committable from a collection of
> > committables.
> >           * @param committables a collection of committables that are
> needed
> > to combine
> >           * @return an aggregated committable
> >           */
> >          GCommT combine(List<CommT> committables);
> >
> >          void commit(List<GCommT> globalCommittables);
> >
> >          /**
> >           * There are no committables any more.
> >           */
> >          void endOfInput();
> > }
> >
> > Best,
> > Guowei
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
Guowei,

Thanks a lot for updating the wiki page. It looks great.

I noticed one inconsistency in the wiki with your last summary email for
GlobalCommitter interface. I think the version in the summary email is the
intended one, because rollover from previous failed commits can accumulate
a list.
CommitResult commit(GlobalCommT globalCommittable); // in the wiki
=>
CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
summary email

I also have a clarifying question regarding the WriterStateT. Since
IcebergWriter won't need to checkpoint any state, should we set it to *Void*
type? Since getWriterStateSerializer() returns Optional, that is clear and
we can return Optional.empty().

Thanks,
Steven

On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote:

> Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments are
> welcome.
>
> Best,
> Guowei
>
>
> On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Yes, that sounds good! I'll probably have some comments on the FLIP
> > about the names of generic parameters and the Javadoc but we can address
> > them later or during implementation.
> >
> > I also think that we probably need the FAIL,RETRY,SUCCESS result for
> > globalCommit() but we can also do that as a later addition.
> >
> > So I think we're good to go to update the FLIP, do any last minute
> > changes and then vote.
> >
> > Best,
> > Aljoscha
> >
> > On 23.09.20 06:13, Guowei Ma wrote:
> > > Hi, all
> > >
> > > Thank everyone very much for your ideas and suggestions. I would try to
> > > summarize again the consensus :). Correct me if I am wrong or
> > misunderstand
> > > you.
> > >
> > > ## Consensus-1
> > >
> > > 1. The motivation of the unified sink API is to decouple the sink
> > > implementation from the different runtime execution mode.
> > > 2. The initial scope of the unified sink API only covers the file
> system
> > > type, which supports the real transactions. The FLIP focuses more on
> the
> > > semantics the new sink api should support.
> > > 3. We prefer the first alternative API, which could give the framework
> a
> > > greater opportunity to optimize.
> > > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
> more
> > > focused.
> > >
> > > ## Consensus-2
> > >
> > > 1. What should the “Unified Sink API” support/cover? It includes two
> > > aspects. 1. The same sink implementation would work for both the batch
> > and
> > > stream execution mode. 2. In the long run we should give the sink
> > developer
> > > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > > 2. Because the batch execution mode does not have the normal checkpoint
> > the
> > > sink developer should not depend on it any more if we want a unified
> > sink.
> > > 3. We can benefit by providing an asynchronous `Writer` version. But
> > > because the unified sink is already very complicated, we don’t add this
> > in
> > > the first version.
> > >
> > >
> > > According to these consensus I would propose the first version of the
> new
> > > sink api as follows. What do you think? Any comments are welcome.
> > >
> > > /**
> > >   * This interface lets the sink developer build a simple transactional
> > sink
> > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> > >   * This sink topology includes one {@link Writer} + one {@link
> > Committer} +
> > > one {@link GlobalCommitter}.
> > >   * The {@link Writer} is responsible for producing the committable.
> > >   * The {@link Committer} is responsible for committing a single
> > > committables.
> > >   * The {@link GlobalCommitter} is responsible for committing an
> > aggregated
> > > committable, which we called global committables.
> > >   *
> > >   * But both the {@link Committer} and the {@link GlobalCommitter} are
> > > optional.
> > >   */
> > > interface TSink<IN, CommT, GCommT, WriterS> {
> > >
> > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> > initContext);
> > >
> > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> > initContext,
> > > List<WriterS> states);
> > >
> > >          Optional<Committer<CommT>> createCommitter();
> > >
> > >          Optional<GlobalCommitter<CommT, GCommT>>
> > createGlobalCommitter();
> > >
> > >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> > >
> > >          Optional<SimpleVersionedSerializer<GCommT>>
> > > getGlobalCommittableSerializer();
> > > }
> > >
> > > /**
> > >   * The {@link GlobalCommitter} is responsible for committing an
> > aggregated
> > > committable, which we called global committables.
> > >   */
> > > interface GlobalCommitter<CommT, GCommT> {
> > >
> > >          /**
> > >           * This method is called when restoring from a failover.
> > >           * @param globalCommittables the global committables that are
> > not
> > > committed in the previous session.
> > >           * @return the global committables that should be committed
> > again
> > > in the current session.
> > >           */
> > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > > globalCommittables);
> > >
> > >          /**
> > >           * Compute an aggregated committable from a collection of
> > > committables.
> > >           * @param committables a collection of committables that are
> > needed
> > > to combine
> > >           * @return an aggregated committable
> > >           */
> > >          GCommT combine(List<CommT> committables);
> > >
> > >          void commit(List<GCommT> globalCommittables);
> > >
> > >          /**
> > >           * There are no committables any more.
> > >           */
> > >          void endOfInput();
> > > }
> > >
> > > Best,
> > > Guowei
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi,Steven

Thank you for reading the FLIP so carefully.
1. The frame can not know which `GlobalCommT` to retry if we use the
List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
2. Of course we can let the `commit` return more detailed info but it might
be too complicated.
3. On the other hand, I think only when restoring IcebergSink needs a
collection of `GlobalCommT` and giving back another collection of
`GlobalCommT` that are not committed.

Best,
Guowei


On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote:

> Guowei,
>
> Thanks a lot for updating the wiki page. It looks great.
>
> I noticed one inconsistency in the wiki with your last summary email for
> GlobalCommitter interface. I think the version in the summary email is the
> intended one, because rollover from previous failed commits can accumulate
> a list.
> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> =>
> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> summary email
>
> I also have a clarifying question regarding the WriterStateT. Since
> IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> type? Since getWriterStateSerializer() returns Optional, that is clear and
> we can return Optional.empty().
>
> Thanks,
> Steven
>
> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote:
>
> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
> are
> > welcome.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Yes, that sounds good! I'll probably have some comments on the FLIP
> > > about the names of generic parameters and the Javadoc but we can
> address
> > > them later or during implementation.
> > >
> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
> > > globalCommit() but we can also do that as a later addition.
> > >
> > > So I think we're good to go to update the FLIP, do any last minute
> > > changes and then vote.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 23.09.20 06:13, Guowei Ma wrote:
> > > > Hi, all
> > > >
> > > > Thank everyone very much for your ideas and suggestions. I would try
> to
> > > > summarize again the consensus :). Correct me if I am wrong or
> > > misunderstand
> > > > you.
> > > >
> > > > ## Consensus-1
> > > >
> > > > 1. The motivation of the unified sink API is to decouple the sink
> > > > implementation from the different runtime execution mode.
> > > > 2. The initial scope of the unified sink API only covers the file
> > system
> > > > type, which supports the real transactions. The FLIP focuses more on
> > the
> > > > semantics the new sink api should support.
> > > > 3. We prefer the first alternative API, which could give the
> framework
> > a
> > > > greater opportunity to optimize.
> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
> method.
> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
> > more
> > > > focused.
> > > >
> > > > ## Consensus-2
> > > >
> > > > 1. What should the “Unified Sink API” support/cover? It includes two
> > > > aspects. 1. The same sink implementation would work for both the
> batch
> > > and
> > > > stream execution mode. 2. In the long run we should give the sink
> > > developer
> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > > > 2. Because the batch execution mode does not have the normal
> checkpoint
> > > the
> > > > sink developer should not depend on it any more if we want a unified
> > > sink.
> > > > 3. We can benefit by providing an asynchronous `Writer` version. But
> > > > because the unified sink is already very complicated, we don’t add
> this
> > > in
> > > > the first version.
> > > >
> > > >
> > > > According to these consensus I would propose the first version of the
> > new
> > > > sink api as follows. What do you think? Any comments are welcome.
> > > >
> > > > /**
> > > >   * This interface lets the sink developer build a simple
> transactional
> > > sink
> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> > > >   * This sink topology includes one {@link Writer} + one {@link
> > > Committer} +
> > > > one {@link GlobalCommitter}.
> > > >   * The {@link Writer} is responsible for producing the committable.
> > > >   * The {@link Committer} is responsible for committing a single
> > > > committables.
> > > >   * The {@link GlobalCommitter} is responsible for committing an
> > > aggregated
> > > > committable, which we called global committables.
> > > >   *
> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
> are
> > > > optional.
> > > >   */
> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> > > >
> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> > > initContext);
> > > >
> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> > > initContext,
> > > > List<WriterS> states);
> > > >
> > > >          Optional<Committer<CommT>> createCommitter();
> > > >
> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> > > createGlobalCommitter();
> > > >
> > > >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> > > >
> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> > > > getGlobalCommittableSerializer();
> > > > }
> > > >
> > > > /**
> > > >   * The {@link GlobalCommitter} is responsible for committing an
> > > aggregated
> > > > committable, which we called global committables.
> > > >   */
> > > > interface GlobalCommitter<CommT, GCommT> {
> > > >
> > > >          /**
> > > >           * This method is called when restoring from a failover.
> > > >           * @param globalCommittables the global committables that
> are
> > > not
> > > > committed in the previous session.
> > > >           * @return the global committables that should be committed
> > > again
> > > > in the current session.
> > > >           */
> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > > > globalCommittables);
> > > >
> > > >          /**
> > > >           * Compute an aggregated committable from a collection of
> > > > committables.
> > > >           * @param committables a collection of committables that are
> > > needed
> > > > to combine
> > > >           * @return an aggregated committable
> > > >           */
> > > >          GCommT combine(List<CommT> committables);
> > > >
> > > >          void commit(List<GCommT> globalCommittables);
> > > >
> > > >          /**
> > > >           * There are no committables any more.
> > > >           */
> > > >          void endOfInput();
> > > > }
> > > >
> > > > Best,
> > > > Guowei
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, Steven

>>I also have a clarifying question regarding the WriterStateT. Since
>>IcebergWriter won't need to checkpoint any state, should we set it to
*Void*
>>type? Since getWriterStateSerializer() returns Optional, that is clear and
>>we can return Optional.empty().

Yes I think you could do it. If you return Optional.empty() we would ignore
all the state you return.

Best,
Guowei


On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote:

> Hi,Steven
>
> Thank you for reading the FLIP so carefully.
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
> might be too complicated.
> 3. On the other hand, I think only when restoring IcebergSink needs a
> collection of `GlobalCommT` and giving back another collection of
> `GlobalCommT` that are not committed.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote:
>
>> Guowei,
>>
>> Thanks a lot for updating the wiki page. It looks great.
>>
>> I noticed one inconsistency in the wiki with your last summary email for
>> GlobalCommitter interface. I think the version in the summary email is the
>> intended one, because rollover from previous failed commits can accumulate
>> a list.
>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>> =>
>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>> summary email
>>
>> I also have a clarifying question regarding the WriterStateT. Since
>> IcebergWriter won't need to checkpoint any state, should we set it to
>> *Void*
>> type? Since getWriterStateSerializer() returns Optional, that is clear and
>> we can return Optional.empty().
>>
>> Thanks,
>> Steven
>>
>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote:
>>
>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
>> are
>> > welcome.
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]>
>> > wrote:
>> >
>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>> > > about the names of generic parameters and the Javadoc but we can
>> address
>> > > them later or during implementation.
>> > >
>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>> > > globalCommit() but we can also do that as a later addition.
>> > >
>> > > So I think we're good to go to update the FLIP, do any last minute
>> > > changes and then vote.
>> > >
>> > > Best,
>> > > Aljoscha
>> > >
>> > > On 23.09.20 06:13, Guowei Ma wrote:
>> > > > Hi, all
>> > > >
>> > > > Thank everyone very much for your ideas and suggestions. I would
>> try to
>> > > > summarize again the consensus :). Correct me if I am wrong or
>> > > misunderstand
>> > > > you.
>> > > >
>> > > > ## Consensus-1
>> > > >
>> > > > 1. The motivation of the unified sink API is to decouple the sink
>> > > > implementation from the different runtime execution mode.
>> > > > 2. The initial scope of the unified sink API only covers the file
>> > system
>> > > > type, which supports the real transactions. The FLIP focuses more on
>> > the
>> > > > semantics the new sink api should support.
>> > > > 3. We prefer the first alternative API, which could give the
>> framework
>> > a
>> > > > greater opportunity to optimize.
>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would
>> be
>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>> method.
>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
>> > more
>> > > > focused.
>> > > >
>> > > > ## Consensus-2
>> > > >
>> > > > 1. What should the “Unified Sink API” support/cover? It includes two
>> > > > aspects. 1. The same sink implementation would work for both the
>> batch
>> > > and
>> > > > stream execution mode. 2. In the long run we should give the sink
>> > > developer
>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12
>> we
>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
>> > > > 2. Because the batch execution mode does not have the normal
>> checkpoint
>> > > the
>> > > > sink developer should not depend on it any more if we want a unified
>> > > sink.
>> > > > 3. We can benefit by providing an asynchronous `Writer` version. But
>> > > > because the unified sink is already very complicated, we don’t add
>> this
>> > > in
>> > > > the first version.
>> > > >
>> > > >
>> > > > According to these consensus I would propose the first version of
>> the
>> > new
>> > > > sink api as follows. What do you think? Any comments are welcome.
>> > > >
>> > > > /**
>> > > >   * This interface lets the sink developer build a simple
>> transactional
>> > > sink
>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>> > > >   * This sink topology includes one {@link Writer} + one {@link
>> > > Committer} +
>> > > > one {@link GlobalCommitter}.
>> > > >   * The {@link Writer} is responsible for producing the committable.
>> > > >   * The {@link Committer} is responsible for committing a single
>> > > > committables.
>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> > > aggregated
>> > > > committable, which we called global committables.
>> > > >   *
>> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
>> are
>> > > > optional.
>> > > >   */
>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>> > > >
>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>> > > initContext);
>> > > >
>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>> > > initContext,
>> > > > List<WriterS> states);
>> > > >
>> > > >          Optional<Committer<CommT>> createCommitter();
>> > > >
>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>> > > createGlobalCommitter();
>> > > >
>> > > >          SimpleVersionedSerializer<CommT>
>> getCommittableSerializer();
>> > > >
>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>> > > > getGlobalCommittableSerializer();
>> > > > }
>> > > >
>> > > > /**
>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> > > aggregated
>> > > > committable, which we called global committables.
>> > > >   */
>> > > > interface GlobalCommitter<CommT, GCommT> {
>> > > >
>> > > >          /**
>> > > >           * This method is called when restoring from a failover.
>> > > >           * @param globalCommittables the global committables that
>> are
>> > > not
>> > > > committed in the previous session.
>> > > >           * @return the global committables that should be committed
>> > > again
>> > > > in the current session.
>> > > >           */
>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>> > > > globalCommittables);
>> > > >
>> > > >          /**
>> > > >           * Compute an aggregated committable from a collection of
>> > > > committables.
>> > > >           * @param committables a collection of committables that
>> are
>> > > needed
>> > > > to combine
>> > > >           * @return an aggregated committable
>> > > >           */
>> > > >          GCommT combine(List<CommT> committables);
>> > > >
>> > > >          void commit(List<GCommT> globalCommittables);
>> > > >
>> > > >          /**
>> > > >           * There are no committables any more.
>> > > >           */
>> > > >          void endOfInput();
>> > > > }
>> > > >
>> > > > Best,
>> > > > Guowei
>> > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, all

From the above discussion we could find that FLIP focuses on providing an
unified transactional sink API. So I updated the FLIP's title to "Unified
Transactional Sink API". But I found that the old link could not be opened
again.

I would update the link[1] here. Sorry for the inconvenience.

[1]https://cwiki.apache.org/confluence/x/KEJ4CQ

Best,
Guowei


On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote:

> Hi, Steven
>
> >>I also have a clarifying question regarding the WriterStateT. Since
> >>IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> >>type? Since getWriterStateSerializer() returns Optional, that is clear
> and
> >>we can return Optional.empty().
>
> Yes I think you could do it. If you return Optional.empty() we would
> ignore all the state you return.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote:
>
>> Hi,Steven
>>
>> Thank you for reading the FLIP so carefully.
>> 1. The frame can not know which `GlobalCommT` to retry if we use the
>> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
>> 2. Of course we can let the `commit` return more detailed info but it
>> might be too complicated.
>> 3. On the other hand, I think only when restoring IcebergSink needs a
>> collection of `GlobalCommT` and giving back another collection of
>> `GlobalCommT` that are not committed.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote:
>>
>>> Guowei,
>>>
>>> Thanks a lot for updating the wiki page. It looks great.
>>>
>>> I noticed one inconsistency in the wiki with your last summary email for
>>> GlobalCommitter interface. I think the version in the summary email is
>>> the
>>> intended one, because rollover from previous failed commits can
>>> accumulate
>>> a list.
>>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>>> =>
>>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>>> summary email
>>>
>>> I also have a clarifying question regarding the WriterStateT. Since
>>> IcebergWriter won't need to checkpoint any state, should we set it to
>>> *Void*
>>> type? Since getWriterStateSerializer() returns Optional, that is clear
>>> and
>>> we can return Optional.empty().
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]> wrote:
>>>
>>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
>>> comments are
>>> > welcome.
>>> >
>>> > Best,
>>> > Guowei
>>> >
>>> >
>>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <[hidden email]>
>>> > wrote:
>>> >
>>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>>> > > about the names of generic parameters and the Javadoc but we can
>>> address
>>> > > them later or during implementation.
>>> > >
>>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>>> > > globalCommit() but we can also do that as a later addition.
>>> > >
>>> > > So I think we're good to go to update the FLIP, do any last minute
>>> > > changes and then vote.
>>> > >
>>> > > Best,
>>> > > Aljoscha
>>> > >
>>> > > On 23.09.20 06:13, Guowei Ma wrote:
>>> > > > Hi, all
>>> > > >
>>> > > > Thank everyone very much for your ideas and suggestions. I would
>>> try to
>>> > > > summarize again the consensus :). Correct me if I am wrong or
>>> > > misunderstand
>>> > > > you.
>>> > > >
>>> > > > ## Consensus-1
>>> > > >
>>> > > > 1. The motivation of the unified sink API is to decouple the sink
>>> > > > implementation from the different runtime execution mode.
>>> > > > 2. The initial scope of the unified sink API only covers the file
>>> > system
>>> > > > type, which supports the real transactions. The FLIP focuses more
>>> on
>>> > the
>>> > > > semantics the new sink api should support.
>>> > > > 3. We prefer the first alternative API, which could give the
>>> framework
>>> > a
>>> > > > greater opportunity to optimize.
>>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would
>>> be
>>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>>> method.
>>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to
>>> be
>>> > more
>>> > > > focused.
>>> > > >
>>> > > > ## Consensus-2
>>> > > >
>>> > > > 1. What should the “Unified Sink API” support/cover? It includes
>>> two
>>> > > > aspects. 1. The same sink implementation would work for both the
>>> batch
>>> > > and
>>> > > > stream execution mode. 2. In the long run we should give the sink
>>> > > developer
>>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12
>>> we
>>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
>>> > > > 2. Because the batch execution mode does not have the normal
>>> checkpoint
>>> > > the
>>> > > > sink developer should not depend on it any more if we want a
>>> unified
>>> > > sink.
>>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
>>> But
>>> > > > because the unified sink is already very complicated, we don’t add
>>> this
>>> > > in
>>> > > > the first version.
>>> > > >
>>> > > >
>>> > > > According to these consensus I would propose the first version of
>>> the
>>> > new
>>> > > > sink api as follows. What do you think? Any comments are welcome.
>>> > > >
>>> > > > /**
>>> > > >   * This interface lets the sink developer build a simple
>>> transactional
>>> > > sink
>>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>>> > > >   * This sink topology includes one {@link Writer} + one {@link
>>> > > Committer} +
>>> > > > one {@link GlobalCommitter}.
>>> > > >   * The {@link Writer} is responsible for producing the
>>> committable.
>>> > > >   * The {@link Committer} is responsible for committing a single
>>> > > > committables.
>>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>>> > > aggregated
>>> > > > committable, which we called global committables.
>>> > > >   *
>>> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
>>> are
>>> > > > optional.
>>> > > >   */
>>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>>> > > >
>>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>>> > > initContext);
>>> > > >
>>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>>> > > initContext,
>>> > > > List<WriterS> states);
>>> > > >
>>> > > >          Optional<Committer<CommT>> createCommitter();
>>> > > >
>>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>>> > > createGlobalCommitter();
>>> > > >
>>> > > >          SimpleVersionedSerializer<CommT>
>>> getCommittableSerializer();
>>> > > >
>>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>>> > > > getGlobalCommittableSerializer();
>>> > > > }
>>> > > >
>>> > > > /**
>>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>>> > > aggregated
>>> > > > committable, which we called global committables.
>>> > > >   */
>>> > > > interface GlobalCommitter<CommT, GCommT> {
>>> > > >
>>> > > >          /**
>>> > > >           * This method is called when restoring from a failover.
>>> > > >           * @param globalCommittables the global committables that
>>> are
>>> > > not
>>> > > > committed in the previous session.
>>> > > >           * @return the global committables that should be
>>> committed
>>> > > again
>>> > > > in the current session.
>>> > > >           */
>>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>>> > > > globalCommittables);
>>> > > >
>>> > > >          /**
>>> > > >           * Compute an aggregated committable from a collection of
>>> > > > committables.
>>> > > >           * @param committables a collection of committables that
>>> are
>>> > > needed
>>> > > > to combine
>>> > > >           * @return an aggregated committable
>>> > > >           */
>>> > > >          GCommT combine(List<CommT> committables);
>>> > > >
>>> > > >          void commit(List<GCommT> globalCommittables);
>>> > > >
>>> > > >          /**
>>> > > >           * There are no committables any more.
>>> > > >           */
>>> > > >          void endOfInput();
>>> > > > }
>>> > > >
>>> > > > Best,
>>> > > > Guowei
>>> > >
>>> > >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
might
> be too complicated.

If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
to be retried. E.g. we have some outage with metadata service, commits for
checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't
want to commit them one by one. It is faster to commit the whole list as
one batch.

> 3. On the other hand, I think only when restoring IcebergSink needs a
> collection of `GlobalCommT` and giving back another collection of
> `GlobalCommT` that are not committed

That is when the job restarted due to failure or deployment.


On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote:

> Hi, all
>
> From the above discussion we could find that FLIP focuses on providing an
> unified transactional sink API. So I updated the FLIP's title to "Unified
> Transactional Sink API". But I found that the old link could not be opened
> again.
>
> I would update the link[1] here. Sorry for the inconvenience.
>
> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote:
>
> > Hi, Steven
> >
> > >>I also have a clarifying question regarding the WriterStateT. Since
> > >>IcebergWriter won't need to checkpoint any state, should we set it to
> > *Void*
> > >>type? Since getWriterStateSerializer() returns Optional, that is clear
> > and
> > >>we can return Optional.empty().
> >
> > Yes I think you could do it. If you return Optional.empty() we would
> > ignore all the state you return.
> >
> > Best,
> > Guowei
> >
> >
> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote:
> >
> >> Hi,Steven
> >>
> >> Thank you for reading the FLIP so carefully.
> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> >> 2. Of course we can let the `commit` return more detailed info but it
> >> might be too complicated.
> >> 3. On the other hand, I think only when restoring IcebergSink needs a
> >> collection of `GlobalCommT` and giving back another collection of
> >> `GlobalCommT` that are not committed.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]> wrote:
> >>
> >>> Guowei,
> >>>
> >>> Thanks a lot for updating the wiki page. It looks great.
> >>>
> >>> I noticed one inconsistency in the wiki with your last summary email
> for
> >>> GlobalCommitter interface. I think the version in the summary email is
> >>> the
> >>> intended one, because rollover from previous failed commits can
> >>> accumulate
> >>> a list.
> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> >>> =>
> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> >>> summary email
> >>>
> >>> I also have a clarifying question regarding the WriterStateT. Since
> >>> IcebergWriter won't need to checkpoint any state, should we set it to
> >>> *Void*
> >>> type? Since getWriterStateSerializer() returns Optional, that is clear
> >>> and
> >>> we can return Optional.empty().
> >>>
> >>> Thanks,
> >>> Steven
> >>>
> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]>
> wrote:
> >>>
> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
> >>> comments are
> >>> > welcome.
> >>> >
> >>> > Best,
> >>> > Guowei
> >>> >
> >>> >
> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
> [hidden email]>
> >>> > wrote:
> >>> >
> >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
> >>> > > about the names of generic parameters and the Javadoc but we can
> >>> address
> >>> > > them later or during implementation.
> >>> > >
> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
> for
> >>> > > globalCommit() but we can also do that as a later addition.
> >>> > >
> >>> > > So I think we're good to go to update the FLIP, do any last minute
> >>> > > changes and then vote.
> >>> > >
> >>> > > Best,
> >>> > > Aljoscha
> >>> > >
> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
> >>> > > > Hi, all
> >>> > > >
> >>> > > > Thank everyone very much for your ideas and suggestions. I would
> >>> try to
> >>> > > > summarize again the consensus :). Correct me if I am wrong or
> >>> > > misunderstand
> >>> > > > you.
> >>> > > >
> >>> > > > ## Consensus-1
> >>> > > >
> >>> > > > 1. The motivation of the unified sink API is to decouple the sink
> >>> > > > implementation from the different runtime execution mode.
> >>> > > > 2. The initial scope of the unified sink API only covers the file
> >>> > system
> >>> > > > type, which supports the real transactions. The FLIP focuses more
> >>> on
> >>> > the
> >>> > > > semantics the new sink api should support.
> >>> > > > 3. We prefer the first alternative API, which could give the
> >>> framework
> >>> > a
> >>> > > > greater opportunity to optimize.
> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
> would
> >>> be
> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
> >>> method.
> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to
> >>> be
> >>> > more
> >>> > > > focused.
> >>> > > >
> >>> > > > ## Consensus-2
> >>> > > >
> >>> > > > 1. What should the “Unified Sink API” support/cover? It includes
> >>> two
> >>> > > > aspects. 1. The same sink implementation would work for both the
> >>> batch
> >>> > > and
> >>> > > > stream execution mode. 2. In the long run we should give the sink
> >>> > > developer
> >>> > > > the ability of building “arbitrary” topologies. But for
> Flink-1.12
> >>> we
> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
> sink.
> >>> > > > 2. Because the batch execution mode does not have the normal
> >>> checkpoint
> >>> > > the
> >>> > > > sink developer should not depend on it any more if we want a
> >>> unified
> >>> > > sink.
> >>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
> >>> But
> >>> > > > because the unified sink is already very complicated, we don’t
> add
> >>> this
> >>> > > in
> >>> > > > the first version.
> >>> > > >
> >>> > > >
> >>> > > > According to these consensus I would propose the first version of
> >>> the
> >>> > new
> >>> > > > sink api as follows. What do you think? Any comments are welcome.
> >>> > > >
> >>> > > > /**
> >>> > > >   * This interface lets the sink developer build a simple
> >>> transactional
> >>> > > sink
> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >>> > > >   * This sink topology includes one {@link Writer} + one {@link
> >>> > > Committer} +
> >>> > > > one {@link GlobalCommitter}.
> >>> > > >   * The {@link Writer} is responsible for producing the
> >>> committable.
> >>> > > >   * The {@link Committer} is responsible for committing a single
> >>> > > > committables.
> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
> >>> > > aggregated
> >>> > > > committable, which we called global committables.
> >>> > > >   *
> >>> > > >   * But both the {@link Committer} and the {@link
> GlobalCommitter}
> >>> are
> >>> > > > optional.
> >>> > > >   */
> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> >>> > > >
> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> >>> > > initContext);
> >>> > > >
> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> >>> > > initContext,
> >>> > > > List<WriterS> states);
> >>> > > >
> >>> > > >          Optional<Committer<CommT>> createCommitter();
> >>> > > >
> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> >>> > > createGlobalCommitter();
> >>> > > >
> >>> > > >          SimpleVersionedSerializer<CommT>
> >>> getCommittableSerializer();
> >>> > > >
> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> >>> > > > getGlobalCommittableSerializer();
> >>> > > > }
> >>> > > >
> >>> > > > /**
> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
> >>> > > aggregated
> >>> > > > committable, which we called global committables.
> >>> > > >   */
> >>> > > > interface GlobalCommitter<CommT, GCommT> {
> >>> > > >
> >>> > > >          /**
> >>> > > >           * This method is called when restoring from a failover.
> >>> > > >           * @param globalCommittables the global committables
> that
> >>> are
> >>> > > not
> >>> > > > committed in the previous session.
> >>> > > >           * @return the global committables that should be
> >>> committed
> >>> > > again
> >>> > > > in the current session.
> >>> > > >           */
> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> >>> > > > globalCommittables);
> >>> > > >
> >>> > > >          /**
> >>> > > >           * Compute an aggregated committable from a collection
> of
> >>> > > > committables.
> >>> > > >           * @param committables a collection of committables that
> >>> are
> >>> > > needed
> >>> > > > to combine
> >>> > > >           * @return an aggregated committable
> >>> > > >           */
> >>> > > >          GCommT combine(List<CommT> committables);
> >>> > > >
> >>> > > >          void commit(List<GCommT> globalCommittables);
> >>> > > >
> >>> > > >          /**
> >>> > > >           * There are no committables any more.
> >>> > > >           */
> >>> > > >          void endOfInput();
> >>> > > > }
> >>> > > >
> >>> > > > Best,
> >>> > > > Guowei
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
I should clarify my last email a little more.

For the example of commits for checkpoints 1-100 failed, the job is still
up (processing records and uploading files). When commit for checkpoint 101
came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
(100 old + 1 new) so that it can commit all of them in one transaction. it
is more efficient than 101 separate transactions.

Maybe the GlobalCommitter#commit semantics is to give the sink all
uncommitted GlobalCommT items and let sink implementation decide whether to
retry one by one or in a single transaction. It could mean that we need to
expand the CommitResult (e.g. a list for each result type, SUCCESS,
FAILURE, RETRY) interface. We can also start with the simple enum style
result for the whole list for now. If we need to break the experimental
API, it is also not a big deal since we only need to update a few sink
implementations.

Thanks,
Steven

On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <[hidden email]> wrote:

> > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > 2. Of course we can let the `commit` return more detailed info but it
> might
> > be too complicated.
>
> If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
> to be retried. E.g. we have some outage with metadata service, commits for
> checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't
> want to commit them one by one. It is faster to commit the whole list as
> one batch.
>
> > 3. On the other hand, I think only when restoring IcebergSink needs a
> > collection of `GlobalCommT` and giving back another collection of
> > `GlobalCommT` that are not committed
>
> That is when the job restarted due to failure or deployment.
>
>
> On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote:
>
>> Hi, all
>>
>> From the above discussion we could find that FLIP focuses on providing an
>> unified transactional sink API. So I updated the FLIP's title to "Unified
>> Transactional Sink API". But I found that the old link could not be opened
>> again.
>>
>> I would update the link[1] here. Sorry for the inconvenience.
>>
>> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote:
>>
>> > Hi, Steven
>> >
>> > >>I also have a clarifying question regarding the WriterStateT. Since
>> > >>IcebergWriter won't need to checkpoint any state, should we set it to
>> > *Void*
>> > >>type? Since getWriterStateSerializer() returns Optional, that is clear
>> > and
>> > >>we can return Optional.empty().
>> >
>> > Yes I think you could do it. If you return Optional.empty() we would
>> > ignore all the state you return.
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]> wrote:
>> >
>> >> Hi,Steven
>> >>
>> >> Thank you for reading the FLIP so carefully.
>> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
>> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
>> >> 2. Of course we can let the `commit` return more detailed info but it
>> >> might be too complicated.
>> >> 3. On the other hand, I think only when restoring IcebergSink needs a
>> >> collection of `GlobalCommT` and giving back another collection of
>> >> `GlobalCommT` that are not committed.
>> >>
>> >> Best,
>> >> Guowei
>> >>
>> >>
>> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]>
>> wrote:
>> >>
>> >>> Guowei,
>> >>>
>> >>> Thanks a lot for updating the wiki page. It looks great.
>> >>>
>> >>> I noticed one inconsistency in the wiki with your last summary email
>> for
>> >>> GlobalCommitter interface. I think the version in the summary email is
>> >>> the
>> >>> intended one, because rollover from previous failed commits can
>> >>> accumulate
>> >>> a list.
>> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>> >>> =>
>> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>> >>> summary email
>> >>>
>> >>> I also have a clarifying question regarding the WriterStateT. Since
>> >>> IcebergWriter won't need to checkpoint any state, should we set it to
>> >>> *Void*
>> >>> type? Since getWriterStateSerializer() returns Optional, that is clear
>> >>> and
>> >>> we can return Optional.empty().
>> >>>
>> >>> Thanks,
>> >>> Steven
>> >>>
>> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]>
>> wrote:
>> >>>
>> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
>> >>> comments are
>> >>> > welcome.
>> >>> >
>> >>> > Best,
>> >>> > Guowei
>> >>> >
>> >>> >
>> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
>> [hidden email]>
>> >>> > wrote:
>> >>> >
>> >>> > > Yes, that sounds good! I'll probably have some comments on the
>> FLIP
>> >>> > > about the names of generic parameters and the Javadoc but we can
>> >>> address
>> >>> > > them later or during implementation.
>> >>> > >
>> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
>> for
>> >>> > > globalCommit() but we can also do that as a later addition.
>> >>> > >
>> >>> > > So I think we're good to go to update the FLIP, do any last minute
>> >>> > > changes and then vote.
>> >>> > >
>> >>> > > Best,
>> >>> > > Aljoscha
>> >>> > >
>> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
>> >>> > > > Hi, all
>> >>> > > >
>> >>> > > > Thank everyone very much for your ideas and suggestions. I would
>> >>> try to
>> >>> > > > summarize again the consensus :). Correct me if I am wrong or
>> >>> > > misunderstand
>> >>> > > > you.
>> >>> > > >
>> >>> > > > ## Consensus-1
>> >>> > > >
>> >>> > > > 1. The motivation of the unified sink API is to decouple the
>> sink
>> >>> > > > implementation from the different runtime execution mode.
>> >>> > > > 2. The initial scope of the unified sink API only covers the
>> file
>> >>> > system
>> >>> > > > type, which supports the real transactions. The FLIP focuses
>> more
>> >>> on
>> >>> > the
>> >>> > > > semantics the new sink api should support.
>> >>> > > > 3. We prefer the first alternative API, which could give the
>> >>> framework
>> >>> > a
>> >>> > > > greater opportunity to optimize.
>> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
>> would
>> >>> be
>> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>> >>> method.
>> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order
>> to
>> >>> be
>> >>> > more
>> >>> > > > focused.
>> >>> > > >
>> >>> > > > ## Consensus-2
>> >>> > > >
>> >>> > > > 1. What should the “Unified Sink API” support/cover? It includes
>> >>> two
>> >>> > > > aspects. 1. The same sink implementation would work for both the
>> >>> batch
>> >>> > > and
>> >>> > > > stream execution mode. 2. In the long run we should give the
>> sink
>> >>> > > developer
>> >>> > > > the ability of building “arbitrary” topologies. But for
>> Flink-1.12
>> >>> we
>> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
>> sink.
>> >>> > > > 2. Because the batch execution mode does not have the normal
>> >>> checkpoint
>> >>> > > the
>> >>> > > > sink developer should not depend on it any more if we want a
>> >>> unified
>> >>> > > sink.
>> >>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
>> >>> But
>> >>> > > > because the unified sink is already very complicated, we don’t
>> add
>> >>> this
>> >>> > > in
>> >>> > > > the first version.
>> >>> > > >
>> >>> > > >
>> >>> > > > According to these consensus I would propose the first version
>> of
>> >>> the
>> >>> > new
>> >>> > > > sink api as follows. What do you think? Any comments are
>> welcome.
>> >>> > > >
>> >>> > > > /**
>> >>> > > >   * This interface lets the sink developer build a simple
>> >>> transactional
>> >>> > > sink
>> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>> >>> > > >   * This sink topology includes one {@link Writer} + one {@link
>> >>> > > Committer} +
>> >>> > > > one {@link GlobalCommitter}.
>> >>> > > >   * The {@link Writer} is responsible for producing the
>> >>> committable.
>> >>> > > >   * The {@link Committer} is responsible for committing a single
>> >>> > > > committables.
>> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> >>> > > aggregated
>> >>> > > > committable, which we called global committables.
>> >>> > > >   *
>> >>> > > >   * But both the {@link Committer} and the {@link
>> GlobalCommitter}
>> >>> are
>> >>> > > > optional.
>> >>> > > >   */
>> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>> >>> > > >
>> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>> >>> > > initContext);
>> >>> > > >
>> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>> >>> > > initContext,
>> >>> > > > List<WriterS> states);
>> >>> > > >
>> >>> > > >          Optional<Committer<CommT>> createCommitter();
>> >>> > > >
>> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>> >>> > > createGlobalCommitter();
>> >>> > > >
>> >>> > > >          SimpleVersionedSerializer<CommT>
>> >>> getCommittableSerializer();
>> >>> > > >
>> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>> >>> > > > getGlobalCommittableSerializer();
>> >>> > > > }
>> >>> > > >
>> >>> > > > /**
>> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> >>> > > aggregated
>> >>> > > > committable, which we called global committables.
>> >>> > > >   */
>> >>> > > > interface GlobalCommitter<CommT, GCommT> {
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * This method is called when restoring from a
>> failover.
>> >>> > > >           * @param globalCommittables the global committables
>> that
>> >>> are
>> >>> > > not
>> >>> > > > committed in the previous session.
>> >>> > > >           * @return the global committables that should be
>> >>> committed
>> >>> > > again
>> >>> > > > in the current session.
>> >>> > > >           */
>> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>> >>> > > > globalCommittables);
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * Compute an aggregated committable from a collection
>> of
>> >>> > > > committables.
>> >>> > > >           * @param committables a collection of committables
>> that
>> >>> are
>> >>> > > needed
>> >>> > > > to combine
>> >>> > > >           * @return an aggregated committable
>> >>> > > >           */
>> >>> > > >          GCommT combine(List<CommT> committables);
>> >>> > > >
>> >>> > > >          void commit(List<GCommT> globalCommittables);
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * There are no committables any more.
>> >>> > > >           */
>> >>> > > >          void endOfInput();
>> >>> > > > }
>> >>> > > >
>> >>> > > > Best,
>> >>> > > > Guowei
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi Steven

Thank you very much for your detailed explanation.

Now I got your point, I could see that there are benefits from committing a
collection of `GlobalCommT` as a whole when the external metastore
environment is unstable at some time.

But I have two little concern about introducing committing the collection
of `GlobalCommit`:

1. For Option1: CommitResult commit(List<GlobalCommitT>). This option
implies that users should commit to the collection of `GlobalCommit` as a
whole.
But maybe not all the system could do it as a whole, for example changing
some file names could not do it. If it is the case I think maybe some guy
would always ask the same question as I asked in the previous mail.

2. For Option2: List<CommitResult> commit(List<GlobalCommitT>). This option
is more clear than the first one. But IMHO this option has only benefits
when the external metastore is unstable and we want to retry many times and
not fail the job. Maybe we should not rety so many times and end up with a
lot of the uncommitted `GlobalCommitT`. If this is the case maybe we should
make the api more clear/simple for the normal scenario. In addition there
is only a globalcommit instance so I think the external system could bear
the pressure.

So personally I would like to say we might keep the API simpler at the
beginning in 1.12

What do you think?

Best,
Guowei


On Fri, Sep 25, 2020 at 9:30 PM Steven Wu <[hidden email]> wrote:

> I should clarify my last email a little more.
>
> For the example of commits for checkpoints 1-100 failed, the job is still
> up (processing records and uploading files). When commit for checkpoint 101
> came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
> (100 old + 1 new) so that it can commit all of them in one transaction. it
> is more efficient than 101 separate transactions.
>
> Maybe the GlobalCommitter#commit semantics is to give the sink all
> uncommitted GlobalCommT items and let sink implementation decide whether to
> retry one by one or in a single transaction. It could mean that we need to
> expand the CommitResult (e.g. a list for each result type, SUCCESS,
> FAILURE, RETRY) interface. We can also start with the simple enum style
> result for the whole list for now. If we need to break the experimental
> API, it is also not a big deal since we only need to update a few sink
> implementations.
>
> Thanks,
> Steven
>
> On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <[hidden email]> wrote:
>
> > > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > > List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > > 2. Of course we can let the `commit` return more detailed info but it
> > might
> > > be too complicated.
> >
> > If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
> > to be retried. E.g. we have some outage with metadata service, commits
> for
> > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we
> don't
> > want to commit them one by one. It is faster to commit the whole list as
> > one batch.
> >
> > > 3. On the other hand, I think only when restoring IcebergSink needs a
> > > collection of `GlobalCommT` and giving back another collection of
> > > `GlobalCommT` that are not committed
> >
> > That is when the job restarted due to failure or deployment.
> >
> >
> > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <[hidden email]> wrote:
> >
> >> Hi, all
> >>
> >> From the above discussion we could find that FLIP focuses on providing
> an
> >> unified transactional sink API. So I updated the FLIP's title to
> "Unified
> >> Transactional Sink API". But I found that the old link could not be
> opened
> >> again.
> >>
> >> I would update the link[1] here. Sorry for the inconvenience.
> >>
> >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <[hidden email]> wrote:
> >>
> >> > Hi, Steven
> >> >
> >> > >>I also have a clarifying question regarding the WriterStateT. Since
> >> > >>IcebergWriter won't need to checkpoint any state, should we set it
> to
> >> > *Void*
> >> > >>type? Since getWriterStateSerializer() returns Optional, that is
> clear
> >> > and
> >> > >>we can return Optional.empty().
> >> >
> >> > Yes I think you could do it. If you return Optional.empty() we would
> >> > ignore all the state you return.
> >> >
> >> > Best,
> >> > Guowei
> >> >
> >> >
> >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <[hidden email]>
> wrote:
> >> >
> >> >> Hi,Steven
> >> >>
> >> >> Thank you for reading the FLIP so carefully.
> >> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
> >> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> >> >> 2. Of course we can let the `commit` return more detailed info but it
> >> >> might be too complicated.
> >> >> 3. On the other hand, I think only when restoring IcebergSink needs a
> >> >> collection of `GlobalCommT` and giving back another collection of
> >> >> `GlobalCommT` that are not committed.
> >> >>
> >> >> Best,
> >> >> Guowei
> >> >>
> >> >>
> >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <[hidden email]>
> >> wrote:
> >> >>
> >> >>> Guowei,
> >> >>>
> >> >>> Thanks a lot for updating the wiki page. It looks great.
> >> >>>
> >> >>> I noticed one inconsistency in the wiki with your last summary email
> >> for
> >> >>> GlobalCommitter interface. I think the version in the summary email
> is
> >> >>> the
> >> >>> intended one, because rollover from previous failed commits can
> >> >>> accumulate
> >> >>> a list.
> >> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> >> >>> =>
> >> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> >> >>> summary email
> >> >>>
> >> >>> I also have a clarifying question regarding the WriterStateT. Since
> >> >>> IcebergWriter won't need to checkpoint any state, should we set it
> to
> >> >>> *Void*
> >> >>> type? Since getWriterStateSerializer() returns Optional, that is
> clear
> >> >>> and
> >> >>> we can return Optional.empty().
> >> >>>
> >> >>> Thanks,
> >> >>> Steven
> >> >>>
> >> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <[hidden email]>
> >> wrote:
> >> >>>
> >> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
> >> >>> comments are
> >> >>> > welcome.
> >> >>> >
> >> >>> > Best,
> >> >>> > Guowei
> >> >>> >
> >> >>> >
> >> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
> >> [hidden email]>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Yes, that sounds good! I'll probably have some comments on the
> >> FLIP
> >> >>> > > about the names of generic parameters and the Javadoc but we can
> >> >>> address
> >> >>> > > them later or during implementation.
> >> >>> > >
> >> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
> >> for
> >> >>> > > globalCommit() but we can also do that as a later addition.
> >> >>> > >
> >> >>> > > So I think we're good to go to update the FLIP, do any last
> minute
> >> >>> > > changes and then vote.
> >> >>> > >
> >> >>> > > Best,
> >> >>> > > Aljoscha
> >> >>> > >
> >> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
> >> >>> > > > Hi, all
> >> >>> > > >
> >> >>> > > > Thank everyone very much for your ideas and suggestions. I
> would
> >> >>> try to
> >> >>> > > > summarize again the consensus :). Correct me if I am wrong or
> >> >>> > > misunderstand
> >> >>> > > > you.
> >> >>> > > >
> >> >>> > > > ## Consensus-1
> >> >>> > > >
> >> >>> > > > 1. The motivation of the unified sink API is to decouple the
> >> sink
> >> >>> > > > implementation from the different runtime execution mode.
> >> >>> > > > 2. The initial scope of the unified sink API only covers the
> >> file
> >> >>> > system
> >> >>> > > > type, which supports the real transactions. The FLIP focuses
> >> more
> >> >>> on
> >> >>> > the
> >> >>> > > > semantics the new sink api should support.
> >> >>> > > > 3. We prefer the first alternative API, which could give the
> >> >>> framework
> >> >>> > a
> >> >>> > > > greater opportunity to optimize.
> >> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
> >> would
> >> >>> be
> >> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the
> `Flush`
> >> >>> method.
> >> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order
> >> to
> >> >>> be
> >> >>> > more
> >> >>> > > > focused.
> >> >>> > > >
> >> >>> > > > ## Consensus-2
> >> >>> > > >
> >> >>> > > > 1. What should the “Unified Sink API” support/cover? It
> includes
> >> >>> two
> >> >>> > > > aspects. 1. The same sink implementation would work for both
> the
> >> >>> batch
> >> >>> > > and
> >> >>> > > > stream execution mode. 2. In the long run we should give the
> >> sink
> >> >>> > > developer
> >> >>> > > > the ability of building “arbitrary” topologies. But for
> >> Flink-1.12
> >> >>> we
> >> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
> >> sink.
> >> >>> > > > 2. Because the batch execution mode does not have the normal
> >> >>> checkpoint
> >> >>> > > the
> >> >>> > > > sink developer should not depend on it any more if we want a
> >> >>> unified
> >> >>> > > sink.
> >> >>> > > > 3. We can benefit by providing an asynchronous `Writer`
> version.
> >> >>> But
> >> >>> > > > because the unified sink is already very complicated, we don’t
> >> add
> >> >>> this
> >> >>> > > in
> >> >>> > > > the first version.
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > According to these consensus I would propose the first version
> >> of
> >> >>> the
> >> >>> > new
> >> >>> > > > sink api as follows. What do you think? Any comments are
> >> welcome.
> >> >>> > > >
> >> >>> > > > /**
> >> >>> > > >   * This interface lets the sink developer build a simple
> >> >>> transactional
> >> >>> > > sink
> >> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >> >>> > > >   * This sink topology includes one {@link Writer} + one
> {@link
> >> >>> > > Committer} +
> >> >>> > > > one {@link GlobalCommitter}.
> >> >>> > > >   * The {@link Writer} is responsible for producing the
> >> >>> committable.
> >> >>> > > >   * The {@link Committer} is responsible for committing a
> single
> >> >>> > > > committables.
> >> >>> > > >   * The {@link GlobalCommitter} is responsible for committing
> an
> >> >>> > > aggregated
> >> >>> > > > committable, which we called global committables.
> >> >>> > > >   *
> >> >>> > > >   * But both the {@link Committer} and the {@link
> >> GlobalCommitter}
> >> >>> are
> >> >>> > > > optional.
> >> >>> > > >   */
> >> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> >> >>> > > >
> >> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> >> >>> > > initContext);
> >> >>> > > >
> >> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> >> >>> > > initContext,
> >> >>> > > > List<WriterS> states);
> >> >>> > > >
> >> >>> > > >          Optional<Committer<CommT>> createCommitter();
> >> >>> > > >
> >> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> >> >>> > > createGlobalCommitter();
> >> >>> > > >
> >> >>> > > >          SimpleVersionedSerializer<CommT>
> >> >>> getCommittableSerializer();
> >> >>> > > >
> >> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> >> >>> > > > getGlobalCommittableSerializer();
> >> >>> > > > }
> >> >>> > > >
> >> >>> > > > /**
> >> >>> > > >   * The {@link GlobalCommitter} is responsible for committing
> an
> >> >>> > > aggregated
> >> >>> > > > committable, which we called global committables.
> >> >>> > > >   */
> >> >>> > > > interface GlobalCommitter<CommT, GCommT> {
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * This method is called when restoring from a
> >> failover.
> >> >>> > > >           * @param globalCommittables the global committables
> >> that
> >> >>> are
> >> >>> > > not
> >> >>> > > > committed in the previous session.
> >> >>> > > >           * @return the global committables that should be
> >> >>> committed
> >> >>> > > again
> >> >>> > > > in the current session.
> >> >>> > > >           */
> >> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> >> >>> > > > globalCommittables);
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * Compute an aggregated committable from a
> collection
> >> of
> >> >>> > > > committables.
> >> >>> > > >           * @param committables a collection of committables
> >> that
> >> >>> are
> >> >>> > > needed
> >> >>> > > > to combine
> >> >>> > > >           * @return an aggregated committable
> >> >>> > > >           */
> >> >>> > > >          GCommT combine(List<CommT> committables);
> >> >>> > > >
> >> >>> > > >          void commit(List<GCommT> globalCommittables);
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * There are no committables any more.
> >> >>> > > >           */
> >> >>> > > >          void endOfInput();
> >> >>> > > > }
> >> >>> > > >
> >> >>> > > > Best,
> >> >>> > > > Guowei
> >> >>> > >
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>
> >>
> >
>
1234