[DISCUSS] FLIP-143: Unified Sink API

classic Classic list List threaded Threaded
61 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, devs & users

As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
of DataStream API and Table API. Users should be able to use DataStream API
to write jobs that support both bounded and unbounded execution modes.
However, Flink does not provide a sink API to guarantee the Exactly-once
semantics in both bounded and unbounded scenarios, which blocks the
unification.

So we want to introduce a new unified sink API which could let the user
develop the sink once and run it everywhere. You could find more details in
FLIP-143[2].

The FLIP contains some open questions that I'd really appreciate inputs
from the community. Some of the open questions include:

   1. We provide two alternative Sink API in the FLIP. The only difference
   between the two versions is how to expose the state to the user. We want to
   know which one is your preference?
   2. How does the sink API support to write to the Hive?
   3. Is the sink an operator or a topology?

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
Guowei,

Thanks a lot for the proposal and starting the discussion thread. Very
excited.

For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single choice satisfy all sinks?

Trying to envision how some sinks can be implemented with this new unified
sink interface.

1. Kafka sink

Kafka supports non-transactional and transactional writes
* Non-transaction writes don't need commit action. we can have *parallel
writers and no/no-op committers*. This is probably true for other
non-transactional message queues.
* Transaction writes can be implemented as *parallel writers and parallel
committers*. In this case, I don't know if it makes sense to separate
writers and committers into two separate operators, because they probably
need to share the same KafkaProducer object.

Either way, both writers and committers probably should *run inside task
managers*.

2. ES sink

ES sink typically buffers the data up to a certain size or time threshold
and then uploads/commits a batch to ES. Writers buffer data and flush when
needed, and committer does the HTTP bulk upload to commit. To avoid
serialization/deserialization cost, we should run *parallel writers and
parallel committers* and they *should be* *chained or bundled together*
while *running inside task managers*.

It can also be implemented as *parallel writers and no/no-op committers*,
where all logics (batching and upload) are put inside the writers.

3. Iceberg [1] sink

It is currently implemented as two-stage operators with *parallel writers
and single-parallelism committers*.
* *parallel writers* that write records into data files. Upon checkpoint,
writers flush and upload the files, and send the metadata/location of the
data files to the downstream committer. Writers need to do the flush inside
the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before
forwarding the checkpoint barrier to the committer
* single-parallelism committer operator. It collects data files from
upstream writers. During "snapshotState", it saves collected data files (or
an uber metadata file) into state. When the checkpoint is completed, inside
"notifyCheckpointComplete" it commits those data files to Iceberg tables. *The
committer has to be single parallelism*, because we don't want hundreds or
thousands of parallel committers to compete for commit operations with
opportunistic concurrency control. It will be very inefficient and probably
infeasible if the parallelism is high. Too many tiny commits/transactions
can also slow down both the table write and read paths due to too many
manifest files.

Right now, both Iceberg writer and committer operators run inside task
managers. It has one major drawback. With Iceberg sink, embarrassingly
parallel jobs won't be embarrassingly parallel anymore. That breaks the
benefit of region recovery for embarrassingly parallel DAG. Conceptually,
the Writer-Committer sink pattern is like the mirroring of the FLIP-27
Enumerator-Reader source pattern. It will be better *if the committer can
run inside the job manager* like the SplitEnumerator for the FLIP-27
source.

-----------------------
Additional questions regarding the doc/API
* Any example for the writer shared state (Writer#snapshotSharedState)?
* We allow the case where the writer has no state, right? Meaning WriterS
can be Void.

[1] https://iceberg.apache.org/

Thanks,
Steven

On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote:

> Hi, devs & users
>
> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
> of DataStream API and Table API. Users should be able to use DataStream API
> to write jobs that support both bounded and unbounded execution modes.
> However, Flink does not provide a sink API to guarantee the Exactly-once
> semantics in both bounded and unbounded scenarios, which blocks the
> unification.
>
> So we want to introduce a new unified sink API which could let the user
> develop the sink once and run it everywhere. You could find more details in
> FLIP-143[2].
>
> The FLIP contains some open questions that I'd really appreciate inputs
> from the community. Some of the open questions include:
>
>    1. We provide two alternative Sink API in the FLIP. The only
>    difference between the two versions is how to expose the state to the user.
>    We want to know which one is your preference?
>    2. How does the sink API support to write to the Hive?
>    3. Is the sink an operator or a topology?
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> Best,
> Guowei
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
Hi Everyone,

thanks to Guowei for publishing the FLIP, and thanks Steven for the very
thoughtful email!

We thought a lot internally about some of the questions you posted but
left a lot (almost all) of the implementation details out of the FLIP
for now because we wanted to focus on semantics and API. I'll try and
address the points below.

## Initial Scope of the new Sink API

We need to accept some initial scope that we want to achieve for Flink
1.12. I don't think we can try and find the solution that will work for
all current and future external systems. For me, the initial goal would
be to produce a Sink API and implementations for systems where you can
prepare "committables" in one process and commit those from another
process. Those are systems that support "real" transactions as you need
them in a two-phase commit protocol. This includes:

  - File Sink, including HDFS, S3 via special part-file uploads
  - Iceberg
  - HDFS

The work should include runtime support for both BATCH and STREAMING
execution as outlined in https://s.apache.org/FLIP-134.

Supporting Kafka already becomes difficult but I'll go into that below.

## Where to run the Committer

Guowei hinted at this in the FLIP: the idea is that the Framework
decides where to run the committer based on requirements and based on
the execution mode (STREAMING or BATCH).

Something that is not in the FLIP but which we thought about is that we
need to allow different types of committers. I'm currently thinking we
need at least a normal "Committer" and a "GlobalCommiter" (name TBD).

The Committer is as described in the FLIP, it's basically a function
"void commit(Committable)". The GobalCommitter would be a function "void
commit(List<Committable>)". The former would be used by an S3 sink where
we can individually commit files to S3, a committable would be the list
of part uploads that will form the final file and the commit operation
creates the metadata in S3. The latter would be used by something like
Iceberg where the Committer needs a global view of all the commits to be
efficient and not overwhelm the system.

I don't know yet if sinks would only implement on type of commit
function or potentially both at the same time, and maybe Commit can
return some CommitResult that gets shipped to the GlobalCommit function.

An interesting read on this topic is the discussion on
https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
FileOutputCommitter and the two different available algorithms for
committing final job/task results.

These interfaces untie the sink implementation from the Runtime and we
could, for example, have a runtime like this:

### BATCH

  - Collect all committables and store them in a fault tolerant way
until the job finishes
  - For a normal Commit function, call it on the individual commits. We
can potentially distribute this if it becomes a bottleneck
  - For GlobalCommit function, call it will all the commits. This cannot
be distributed

We can collect the committables in an OperatorCoordinator or potentially
somehow in a task. Though I prefer an OperatorCoordinator right now. The
operator coordinator needs to keep the commits in a fault-tolerant way.

### STREAMING

  - For normal Commit, keep the committables in state on the individual
tasks, commit them when a checkpoint completes
  - For global CommitFunction we have options: collect them in a DOP-1
operator in the topology or send them to an OperatorCoordinator to do
the commit there. This is where the source/sink duality that Steven
mentions becomes visible.

## Kafka

Kafka is a problematic case because it doesn't really support
transactions as outlined above. Our current Sink implementations works
around that with hacks but that only gets us so far.

The problem with Kafka is that we need to aggressively clean up pending
transactions in case a failure happens. Otherwise stale transactions
would block downstream consumers. See here for details:
http://kafka.apache.org/documentation/#isolation.level.

The way we solve this in the current Kafka sink is by using a fixed pool
of transactional IDs and then cancelling all outstanding transactions
for the IDs when we restore from a savepoint. In order for this to work
we need to recycle the IDs, so there needs to be a back-channel from the
Committer to the Writter, or they need to share internal state.

I don't get see a satisfying solution for this so I think we should
exclude this from the initial version.

## On Write-Ahead-Log Sinks

Some sinks, like ES or Cassandra would require that we keep a WAL in
Flink and then ship the contents to the external system on checkpoint.
The reason is that these systems don't support real transactions where
you can prepare them in one process and commit them from another process.

Best,
Aljoscha


On 11.09.20 02:23, Steven Wu wrote:

> Guowei,
>
> Thanks a lot for the proposal and starting the discussion thread. Very
> excited.
>
> For the big question of "Is the sink an operator or a topology?", I have a
> few related sub questions.
> * Where should we run the committers?
> * Is the committer parallel or single parallelism?
> * Can a single choice satisfy all sinks?
>
> Trying to envision how some sinks can be implemented with this new unified
> sink interface.
>
> 1. Kafka sink
>
> Kafka supports non-transactional and transactional writes
> * Non-transaction writes don't need commit action. we can have *parallel
> writers and no/no-op committers*. This is probably true for other
> non-transactional message queues.
> * Transaction writes can be implemented as *parallel writers and parallel
> committers*. In this case, I don't know if it makes sense to separate
> writers and committers into two separate operators, because they probably
> need to share the same KafkaProducer object.
>
> Either way, both writers and committers probably should *run inside task
> managers*.
>
> 2. ES sink
>
> ES sink typically buffers the data up to a certain size or time threshold
> and then uploads/commits a batch to ES. Writers buffer data and flush when
> needed, and committer does the HTTP bulk upload to commit. To avoid
> serialization/deserialization cost, we should run *parallel writers and
> parallel committers* and they *should be* *chained or bundled together*
> while *running inside task managers*.
>
> It can also be implemented as *parallel writers and no/no-op committers*,
> where all logics (batching and upload) are put inside the writers.
>
> 3. Iceberg [1] sink
>
> It is currently implemented as two-stage operators with *parallel writers
> and single-parallelism committers*.
> * *parallel writers* that write records into data files. Upon checkpoint,
> writers flush and upload the files, and send the metadata/location of the
> data files to the downstream committer. Writers need to do the flush inside
> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before
> forwarding the checkpoint barrier to the committer
> * single-parallelism committer operator. It collects data files from
> upstream writers. During "snapshotState", it saves collected data files (or
> an uber metadata file) into state. When the checkpoint is completed, inside
> "notifyCheckpointComplete" it commits those data files to Iceberg tables. *The
> committer has to be single parallelism*, because we don't want hundreds or
> thousands of parallel committers to compete for commit operations with
> opportunistic concurrency control. It will be very inefficient and probably
> infeasible if the parallelism is high. Too many tiny commits/transactions
> can also slow down both the table write and read paths due to too many
> manifest files.
>
> Right now, both Iceberg writer and committer operators run inside task
> managers. It has one major drawback. With Iceberg sink, embarrassingly
> parallel jobs won't be embarrassingly parallel anymore. That breaks the
> benefit of region recovery for embarrassingly parallel DAG. Conceptually,
> the Writer-Committer sink pattern is like the mirroring of the FLIP-27
> Enumerator-Reader source pattern. It will be better *if the committer can
> run inside the job manager* like the SplitEnumerator for the FLIP-27
> source.
>
> -----------------------
> Additional questions regarding the doc/API
> * Any example for the writer shared state (Writer#snapshotSharedState)?
> * We allow the case where the writer has no state, right? Meaning WriterS
> can be Void.
>
> [1] https://iceberg.apache.org/
>
> Thanks,
> Steven
>
> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote:
>
>> Hi, devs & users
>>
>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
>> of DataStream API and Table API. Users should be able to use DataStream API
>> to write jobs that support both bounded and unbounded execution modes.
>> However, Flink does not provide a sink API to guarantee the Exactly-once
>> semantics in both bounded and unbounded scenarios, which blocks the
>> unification.
>>
>> So we want to introduce a new unified sink API which could let the user
>> develop the sink once and run it everywhere. You could find more details in
>> FLIP-143[2].
>>
>> The FLIP contains some open questions that I'd really appreciate inputs
>> from the community. Some of the open questions include:
>>
>>     1. We provide two alternative Sink API in the FLIP. The only
>>     difference between the two versions is how to expose the state to the user.
>>     We want to know which one is your preference?
>>     2. How does the sink API support to write to the Hive?
>>     3. Is the sink an operator or a topology?
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>>
>> Best,
>> Guowei
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
Regarding the FLIP itself, I like the motivation section and the
What/How/When/Where section a lot!

I don't understand why we need the "Drain and Snapshot" section. It
seems to be some details about stop-with-savepoint and drain, and the
relation to BATCH execution but I don't know if it is needed to
understand the rest of the document. I'm happy to be wrong here, though,
if there's good reasons for the section.

On the question of Alternative 1 and 2, I have a strong preference for
Alternative 1 because we could avoid strong coupling to other modules.
With Alternative 2 we would depend on `flink-streaming-java` and even
`flink-runtime`. For the new source API (FLIP-27) we managed to keep the
dependencies slim and the code is in flink-core. I'd be very happy if we
can manage the same for the new sink API.

Best,
Aljoscha

On 11.09.20 12:02, Aljoscha Krettek wrote:

> Hi Everyone,
>
> thanks to Guowei for publishing the FLIP, and thanks Steven for the very
> thoughtful email!
>
> We thought a lot internally about some of the questions you posted but
> left a lot (almost all) of the implementation details out of the FLIP
> for now because we wanted to focus on semantics and API. I'll try and
> address the points below.
>
> ## Initial Scope of the new Sink API
>
> We need to accept some initial scope that we want to achieve for Flink
> 1.12. I don't think we can try and find the solution that will work for
> all current and future external systems. For me, the initial goal would
> be to produce a Sink API and implementations for systems where you can
> prepare "committables" in one process and commit those from another
> process. Those are systems that support "real" transactions as you need
> them in a two-phase commit protocol. This includes:
>
>   - File Sink, including HDFS, S3 via special part-file uploads
>   - Iceberg
>   - HDFS
>
> The work should include runtime support for both BATCH and STREAMING
> execution as outlined in https://s.apache.org/FLIP-134.
>
> Supporting Kafka already becomes difficult but I'll go into that below.
>
> ## Where to run the Committer
>
> Guowei hinted at this in the FLIP: the idea is that the Framework
> decides where to run the committer based on requirements and based on
> the execution mode (STREAMING or BATCH).
>
> Something that is not in the FLIP but which we thought about is that we
> need to allow different types of committers. I'm currently thinking we
> need at least a normal "Committer" and a "GlobalCommiter" (name TBD).
>
> The Committer is as described in the FLIP, it's basically a function
> "void commit(Committable)". The GobalCommitter would be a function "void
> commit(List<Committable>)". The former would be used by an S3 sink where
> we can individually commit files to S3, a committable would be the list
> of part uploads that will form the final file and the commit operation
> creates the metadata in S3. The latter would be used by something like
> Iceberg where the Committer needs a global view of all the commits to be
> efficient and not overwhelm the system.
>
> I don't know yet if sinks would only implement on type of commit
> function or potentially both at the same time, and maybe Commit can
> return some CommitResult that gets shipped to the GlobalCommit function.
>
> An interesting read on this topic is the discussion on
> https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
> FileOutputCommitter and the two different available algorithms for
> committing final job/task results.
>
> These interfaces untie the sink implementation from the Runtime and we
> could, for example, have a runtime like this:
>
> ### BATCH
>
>   - Collect all committables and store them in a fault tolerant way
> until the job finishes
>   - For a normal Commit function, call it on the individual commits. We
> can potentially distribute this if it becomes a bottleneck
>   - For GlobalCommit function, call it will all the commits. This cannot
> be distributed
>
> We can collect the committables in an OperatorCoordinator or potentially
> somehow in a task. Though I prefer an OperatorCoordinator right now. The
> operator coordinator needs to keep the commits in a fault-tolerant way.
>
> ### STREAMING
>
>   - For normal Commit, keep the committables in state on the individual
> tasks, commit them when a checkpoint completes
>   - For global CommitFunction we have options: collect them in a DOP-1
> operator in the topology or send them to an OperatorCoordinator to do
> the commit there. This is where the source/sink duality that Steven
> mentions becomes visible.
>
> ## Kafka
>
> Kafka is a problematic case because it doesn't really support
> transactions as outlined above. Our current Sink implementations works
> around that with hacks but that only gets us so far.
>
> The problem with Kafka is that we need to aggressively clean up pending
> transactions in case a failure happens. Otherwise stale transactions
> would block downstream consumers. See here for details:
> http://kafka.apache.org/documentation/#isolation.level.
>
> The way we solve this in the current Kafka sink is by using a fixed pool
> of transactional IDs and then cancelling all outstanding transactions
> for the IDs when we restore from a savepoint. In order for this to work
> we need to recycle the IDs, so there needs to be a back-channel from the
> Committer to the Writter, or they need to share internal state.
>
> I don't get see a satisfying solution for this so I think we should
> exclude this from the initial version.
>
> ## On Write-Ahead-Log Sinks
>
> Some sinks, like ES or Cassandra would require that we keep a WAL in
> Flink and then ship the contents to the external system on checkpoint.
> The reason is that these systems don't support real transactions where
> you can prepare them in one process and commit them from another process.
>
> Best,
> Aljoscha
>
>
> On 11.09.20 02:23, Steven Wu wrote:
>> Guowei,
>>
>> Thanks a lot for the proposal and starting the discussion thread. Very
>> excited.
>>
>> For the big question of "Is the sink an operator or a topology?", I
>> have a
>> few related sub questions.
>> * Where should we run the committers?
>> * Is the committer parallel or single parallelism?
>> * Can a single choice satisfy all sinks?
>>
>> Trying to envision how some sinks can be implemented with this new
>> unified
>> sink interface.
>>
>> 1. Kafka sink
>>
>> Kafka supports non-transactional and transactional writes
>> * Non-transaction writes don't need commit action. we can have *parallel
>> writers and no/no-op committers*. This is probably true for other
>> non-transactional message queues.
>> * Transaction writes can be implemented as *parallel writers and parallel
>> committers*. In this case, I don't know if it makes sense to separate
>> writers and committers into two separate operators, because they probably
>> need to share the same KafkaProducer object.
>>
>> Either way, both writers and committers probably should *run inside task
>> managers*.
>>
>> 2. ES sink
>>
>> ES sink typically buffers the data up to a certain size or time threshold
>> and then uploads/commits a batch to ES. Writers buffer data and flush
>> when
>> needed, and committer does the HTTP bulk upload to commit. To avoid
>> serialization/deserialization cost, we should run *parallel writers and
>> parallel committers* and they *should be* *chained or bundled together*
>> while *running inside task managers*.
>>
>> It can also be implemented as *parallel writers and no/no-op committers*,
>> where all logics (batching and upload) are put inside the writers.
>>
>> 3. Iceberg [1] sink
>>
>> It is currently implemented as two-stage operators with *parallel writers
>> and single-parallelism committers*.
>> * *parallel writers* that write records into data files. Upon checkpoint,
>> writers flush and upload the files, and send the metadata/location of the
>> data files to the downstream committer. Writers need to do the flush
>> inside
>> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method)
>> before
>> forwarding the checkpoint barrier to the committer
>> * single-parallelism committer operator. It collects data files from
>> upstream writers. During "snapshotState", it saves collected data
>> files (or
>> an uber metadata file) into state. When the checkpoint is completed,
>> inside
>> "notifyCheckpointComplete" it commits those data files to Iceberg
>> tables. *The
>> committer has to be single parallelism*, because we don't want
>> hundreds or
>> thousands of parallel committers to compete for commit operations with
>> opportunistic concurrency control. It will be very inefficient and
>> probably
>> infeasible if the parallelism is high. Too many tiny commits/transactions
>> can also slow down both the table write and read paths due to too many
>> manifest files.
>>
>> Right now, both Iceberg writer and committer operators run inside task
>> managers. It has one major drawback. With Iceberg sink, embarrassingly
>> parallel jobs won't be embarrassingly parallel anymore. That breaks the
>> benefit of region recovery for embarrassingly parallel DAG. Conceptually,
>> the Writer-Committer sink pattern is like the mirroring of the FLIP-27
>> Enumerator-Reader source pattern. It will be better *if the committer can
>> run inside the job manager* like the SplitEnumerator for the FLIP-27
>> source.
>>
>> -----------------------
>> Additional questions regarding the doc/API
>> * Any example for the writer shared state (Writer#snapshotSharedState)?
>> * We allow the case where the writer has no state, right? Meaning WriterS
>> can be Void.
>>
>> [1] https://iceberg.apache.org/
>>
>> Thanks,
>> Steven
>>
>> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote:
>>
>>> Hi, devs & users
>>>
>>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in
>>> favor
>>> of DataStream API and Table API. Users should be able to use
>>> DataStream API
>>> to write jobs that support both bounded and unbounded execution modes.
>>> However, Flink does not provide a sink API to guarantee the Exactly-once
>>> semantics in both bounded and unbounded scenarios, which blocks the
>>> unification.
>>>
>>> So we want to introduce a new unified sink API which could let the user
>>> develop the sink once and run it everywhere. You could find more
>>> details in
>>> FLIP-143[2].
>>>
>>> The FLIP contains some open questions that I'd really appreciate inputs
>>> from the community. Some of the open questions include:
>>>
>>>     1. We provide two alternative Sink API in the FLIP. The only
>>>     difference between the two versions is how to expose the state to
>>> the user.
>>>     We want to know which one is your preference?
>>>     2. How does the sink API support to write to the Hive?
>>>     3. Is the sink an operator or a topology?
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 
>>>
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API 
>>>
>>>
>>> Best,
>>> Guowei
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
Aljoscha, thanks a lot for the detailed response. Now I have a better
understanding of the initial scope.

To me, there are possibly three different committer behaviors. For the lack
of better names, let's call them
* No/NoopCommitter
* Committer / LocalCommitter (file sink?)
* GlobalCommitter (Iceberg)

## Writer interface

For the Writer interface, should we add "*prepareSnapshot"* before the
checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
would the framework call "*flush*" before the barrier emitted downstream?
that guarantee would achieve the same goal.
-----------------
// before barrier emitted to downstream
void prepareSnapshot(long checkpointId) throws Exception;

// or will flush be called automatically before the barrier emitted
downstream?
// if yes, we need the checkpointId arg for the reason listed in [1]
void flush(WriterOutput<CommT> output) throws IOException;

In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
tuple to the committer. The committer needs checkpointId to separate out
data files for different checkpoints if concurrent checkpoints are enabled.
For that reason, writers need to know the checkpointId where the restore
happened. Can we add a RestoreContext interface to the restoreWriter method?
---------------
Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context,
RestoreContext restoreContext, List<WriterS> state, List<SharedS> share);

interface RestoreContext {
  long getCheckpointId();
}


## Committer interface

For the Committer interface, I am wondering if we should split the single
commit method into separate "*collect"* and "*commit"* methods? This way,
it can handle both single and multiple CommT objects.
------------------
void commit(CommT committable) throws Exception;
      ==>
void collect(CommT committable) throws Exception;
void commit() throws Exception;

As discussed in [1] and mentioned above, the Iceberg committer needs to
know which checkpointId is the commit for. So can we add checkpiontId arg
to the commit API. However, I don't know how this would affect the batch
execution where checkpoints are disabled.
------------------
void commit(long checkpointId) throws Exception;

For Iceberg, writers don't need any state. But the GlobalCommitter needs to
checkpoint StateT. For the committer, CommT is "DataFile". Since a single
committer can collect thousands (or more) data files in one checkpoint
cycle, as an optimization we checkpoint a single "ManifestFile" (for the
collected thousands data files) as StateT. This allows us to absorb
extended commit outages without losing written/uploaded data files, as
operator state size is as small as one manifest file per checkpoint cycle
[2].
------------------
StateT snapshotState(SnapshotContext context) throws Exception;

That means we also need the restoreCommitter API in the Sink interface
---------------
Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
state);


Thanks,
Steven

[1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663
[2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104



On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek <[hidden email]>
wrote:

> Regarding the FLIP itself, I like the motivation section and the
> What/How/When/Where section a lot!
>
> I don't understand why we need the "Drain and Snapshot" section. It
> seems to be some details about stop-with-savepoint and drain, and the
> relation to BATCH execution but I don't know if it is needed to
> understand the rest of the document. I'm happy to be wrong here, though,
> if there's good reasons for the section.
>
> On the question of Alternative 1 and 2, I have a strong preference for
> Alternative 1 because we could avoid strong coupling to other modules.
> With Alternative 2 we would depend on `flink-streaming-java` and even
> `flink-runtime`. For the new source API (FLIP-27) we managed to keep the
> dependencies slim and the code is in flink-core. I'd be very happy if we
> can manage the same for the new sink API.
>
> Best,
> Aljoscha
>
> On 11.09.20 12:02, Aljoscha Krettek wrote:
> > Hi Everyone,
> >
> > thanks to Guowei for publishing the FLIP, and thanks Steven for the very
> > thoughtful email!
> >
> > We thought a lot internally about some of the questions you posted but
> > left a lot (almost all) of the implementation details out of the FLIP
> > for now because we wanted to focus on semantics and API. I'll try and
> > address the points below.
> >
> > ## Initial Scope of the new Sink API
> >
> > We need to accept some initial scope that we want to achieve for Flink
> > 1.12. I don't think we can try and find the solution that will work for
> > all current and future external systems. For me, the initial goal would
> > be to produce a Sink API and implementations for systems where you can
> > prepare "committables" in one process and commit those from another
> > process. Those are systems that support "real" transactions as you need
> > them in a two-phase commit protocol. This includes:
> >
> >   - File Sink, including HDFS, S3 via special part-file uploads
> >   - Iceberg
> >   - HDFS
> >
> > The work should include runtime support for both BATCH and STREAMING
> > execution as outlined in https://s.apache.org/FLIP-134.
> >
> > Supporting Kafka already becomes difficult but I'll go into that below.
> >
> > ## Where to run the Committer
> >
> > Guowei hinted at this in the FLIP: the idea is that the Framework
> > decides where to run the committer based on requirements and based on
> > the execution mode (STREAMING or BATCH).
> >
> > Something that is not in the FLIP but which we thought about is that we
> > need to allow different types of committers. I'm currently thinking we
> > need at least a normal "Committer" and a "GlobalCommiter" (name TBD).
> >
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> >
> > An interesting read on this topic is the discussion on
> > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
> > FileOutputCommitter and the two different available algorithms for
> > committing final job/task results.
> >
> > These interfaces untie the sink implementation from the Runtime and we
> > could, for example, have a runtime like this:
> >
> > ### BATCH
> >
> >   - Collect all committables and store them in a fault tolerant way
> > until the job finishes
> >   - For a normal Commit function, call it on the individual commits. We
> > can potentially distribute this if it becomes a bottleneck
> >   - For GlobalCommit function, call it will all the commits. This cannot
> > be distributed
> >
> > We can collect the committables in an OperatorCoordinator or potentially
> > somehow in a task. Though I prefer an OperatorCoordinator right now. The
> > operator coordinator needs to keep the commits in a fault-tolerant way.
> >
> > ### STREAMING
> >
> >   - For normal Commit, keep the committables in state on the individual
> > tasks, commit them when a checkpoint completes
> >   - For global CommitFunction we have options: collect them in a DOP-1
> > operator in the topology or send them to an OperatorCoordinator to do
> > the commit there. This is where the source/sink duality that Steven
> > mentions becomes visible.
> >
> > ## Kafka
> >
> > Kafka is a problematic case because it doesn't really support
> > transactions as outlined above. Our current Sink implementations works
> > around that with hacks but that only gets us so far.
> >
> > The problem with Kafka is that we need to aggressively clean up pending
> > transactions in case a failure happens. Otherwise stale transactions
> > would block downstream consumers. See here for details:
> > http://kafka.apache.org/documentation/#isolation.level.
> >
> > The way we solve this in the current Kafka sink is by using a fixed pool
> > of transactional IDs and then cancelling all outstanding transactions
> > for the IDs when we restore from a savepoint. In order for this to work
> > we need to recycle the IDs, so there needs to be a back-channel from the
> > Committer to the Writter, or they need to share internal state.
> >
> > I don't get see a satisfying solution for this so I think we should
> > exclude this from the initial version.
> >
> > ## On Write-Ahead-Log Sinks
> >
> > Some sinks, like ES or Cassandra would require that we keep a WAL in
> > Flink and then ship the contents to the external system on checkpoint.
> > The reason is that these systems don't support real transactions where
> > you can prepare them in one process and commit them from another process.
> >
> > Best,
> > Aljoscha
> >
> >
> > On 11.09.20 02:23, Steven Wu wrote:
> >> Guowei,
> >>
> >> Thanks a lot for the proposal and starting the discussion thread. Very
> >> excited.
> >>
> >> For the big question of "Is the sink an operator or a topology?", I
> >> have a
> >> few related sub questions.
> >> * Where should we run the committers?
> >> * Is the committer parallel or single parallelism?
> >> * Can a single choice satisfy all sinks?
> >>
> >> Trying to envision how some sinks can be implemented with this new
> >> unified
> >> sink interface.
> >>
> >> 1. Kafka sink
> >>
> >> Kafka supports non-transactional and transactional writes
> >> * Non-transaction writes don't need commit action. we can have *parallel
> >> writers and no/no-op committers*. This is probably true for other
> >> non-transactional message queues.
> >> * Transaction writes can be implemented as *parallel writers and
> parallel
> >> committers*. In this case, I don't know if it makes sense to separate
> >> writers and committers into two separate operators, because they
> probably
> >> need to share the same KafkaProducer object.
> >>
> >> Either way, both writers and committers probably should *run inside task
> >> managers*.
> >>
> >> 2. ES sink
> >>
> >> ES sink typically buffers the data up to a certain size or time
> threshold
> >> and then uploads/commits a batch to ES. Writers buffer data and flush
> >> when
> >> needed, and committer does the HTTP bulk upload to commit. To avoid
> >> serialization/deserialization cost, we should run *parallel writers and
> >> parallel committers* and they *should be* *chained or bundled together*
> >> while *running inside task managers*.
> >>
> >> It can also be implemented as *parallel writers and no/no-op
> committers*,
> >> where all logics (batching and upload) are put inside the writers.
> >>
> >> 3. Iceberg [1] sink
> >>
> >> It is currently implemented as two-stage operators with *parallel
> writers
> >> and single-parallelism committers*.
> >> * *parallel writers* that write records into data files. Upon
> checkpoint,
> >> writers flush and upload the files, and send the metadata/location of
> the
> >> data files to the downstream committer. Writers need to do the flush
> >> inside
> >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method)
> >> before
> >> forwarding the checkpoint barrier to the committer
> >> * single-parallelism committer operator. It collects data files from
> >> upstream writers. During "snapshotState", it saves collected data
> >> files (or
> >> an uber metadata file) into state. When the checkpoint is completed,
> >> inside
> >> "notifyCheckpointComplete" it commits those data files to Iceberg
> >> tables. *The
> >> committer has to be single parallelism*, because we don't want
> >> hundreds or
> >> thousands of parallel committers to compete for commit operations with
> >> opportunistic concurrency control. It will be very inefficient and
> >> probably
> >> infeasible if the parallelism is high. Too many tiny
> commits/transactions
> >> can also slow down both the table write and read paths due to too many
> >> manifest files.
> >>
> >> Right now, both Iceberg writer and committer operators run inside task
> >> managers. It has one major drawback. With Iceberg sink, embarrassingly
> >> parallel jobs won't be embarrassingly parallel anymore. That breaks the
> >> benefit of region recovery for embarrassingly parallel DAG.
> Conceptually,
> >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27
> >> Enumerator-Reader source pattern. It will be better *if the committer
> can
> >> run inside the job manager* like the SplitEnumerator for the FLIP-27
> >> source.
> >>
> >> -----------------------
> >> Additional questions regarding the doc/API
> >> * Any example for the writer shared state (Writer#snapshotSharedState)?
> >> * We allow the case where the writer has no state, right? Meaning
> WriterS
> >> can be Void.
> >>
> >> [1] https://iceberg.apache.org/
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote:
> >>
> >>> Hi, devs & users
> >>>
> >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in
> >>> favor
> >>> of DataStream API and Table API. Users should be able to use
> >>> DataStream API
> >>> to write jobs that support both bounded and unbounded execution modes.
> >>> However, Flink does not provide a sink API to guarantee the
> Exactly-once
> >>> semantics in both bounded and unbounded scenarios, which blocks the
> >>> unification.
> >>>
> >>> So we want to introduce a new unified sink API which could let the user
> >>> develop the sink once and run it everywhere. You could find more
> >>> details in
> >>> FLIP-143[2].
> >>>
> >>> The FLIP contains some open questions that I'd really appreciate inputs
> >>> from the community. Some of the open questions include:
> >>>
> >>>     1. We provide two alternative Sink API in the FLIP. The only
> >>>     difference between the two versions is how to expose the state to
> >>> the user.
> >>>     We want to know which one is your preference?
> >>>     2. How does the sink API support to write to the Hive?
> >>>     3. Is the sink an operator or a topology?
> >>>
> >>> [1]
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> >>>
> >>> [2]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >>>
> >>>
> >>> Best,
> >>> Guowei
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Piotr Nowojski-5
Hi,

I've just briefly skimmed over the proposed interfaces. I would suggest one
addition to the Writer interface (as I understand this is the runtime
interface in this proposal?): add some availability method, to avoid, if
possible, blocking calls on the sink. We already have similar
availability methods in the new sources [1] and in various places in the
network stack [2].

I'm aware that many implementations wouldn't be able to implement it, but
some may. For example `FlinkKafkaProducer` could
use `FlinkKafkaProducer#pendingRecords` to control `Writer`'s availability.
Also any sink that would be implementing records handover to some writer
thread could easily provide availability as well.

Non blocking calls are important for many things, for example they are
crucial for unaligned checkpoints to complete quickly.

Piotrek

[1] org.apache.flink.api.connector.source.SourceReader#isAvailable
[2] org.apache.flink.runtime.io.AvailabilityProvider

pon., 14 wrz 2020 o 01:23 Steven Wu <[hidden email]> napisał(a):

> Aljoscha, thanks a lot for the detailed response. Now I have a better
> understanding of the initial scope.
>
> To me, there are possibly three different committer behaviors. For the lack
> of better names, let's call them
> * No/NoopCommitter
> * Committer / LocalCommitter (file sink?)
> * GlobalCommitter (Iceberg)
>
> ## Writer interface
>
> For the Writer interface, should we add "*prepareSnapshot"* before the
> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> would the framework call "*flush*" before the barrier emitted downstream?
> that guarantee would achieve the same goal.
> -----------------
> // before barrier emitted to downstream
> void prepareSnapshot(long checkpointId) throws Exception;
>
> // or will flush be called automatically before the barrier emitted
> downstream?
> // if yes, we need the checkpointId arg for the reason listed in [1]
> void flush(WriterOutput<CommT> output) throws IOException;
>
> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> tuple to the committer. The committer needs checkpointId to separate out
> data files for different checkpoints if concurrent checkpoints are enabled.
> For that reason, writers need to know the checkpointId where the restore
> happened. Can we add a RestoreContext interface to the restoreWriter
> method?
> ---------------
> Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context,
> RestoreContext restoreContext, List<WriterS> state, List<SharedS> share);
>
> interface RestoreContext {
>   long getCheckpointId();
> }
>
>
> ## Committer interface
>
> For the Committer interface, I am wondering if we should split the single
> commit method into separate "*collect"* and "*commit"* methods? This way,
> it can handle both single and multiple CommT objects.
> ------------------
> void commit(CommT committable) throws Exception;
>       ==>
> void collect(CommT committable) throws Exception;
> void commit() throws Exception;
>
> As discussed in [1] and mentioned above, the Iceberg committer needs to
> know which checkpointId is the commit for. So can we add checkpiontId arg
> to the commit API. However, I don't know how this would affect the batch
> execution where checkpoints are disabled.
> ------------------
> void commit(long checkpointId) throws Exception;
>
> For Iceberg, writers don't need any state. But the GlobalCommitter needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint cycle
> [2].
> ------------------
> StateT snapshotState(SnapshotContext context) throws Exception;
>
> That means we also need the restoreCommitter API in the Sink interface
> ---------------
> Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> state);
>
>
> Thanks,
> Steven
>
> [1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663
> [2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104
>
>
>
> On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Regarding the FLIP itself, I like the motivation section and the
> > What/How/When/Where section a lot!
> >
> > I don't understand why we need the "Drain and Snapshot" section. It
> > seems to be some details about stop-with-savepoint and drain, and the
> > relation to BATCH execution but I don't know if it is needed to
> > understand the rest of the document. I'm happy to be wrong here, though,
> > if there's good reasons for the section.
> >
> > On the question of Alternative 1 and 2, I have a strong preference for
> > Alternative 1 because we could avoid strong coupling to other modules.
> > With Alternative 2 we would depend on `flink-streaming-java` and even
> > `flink-runtime`. For the new source API (FLIP-27) we managed to keep the
> > dependencies slim and the code is in flink-core. I'd be very happy if we
> > can manage the same for the new sink API.
> >
> > Best,
> > Aljoscha
> >
> > On 11.09.20 12:02, Aljoscha Krettek wrote:
> > > Hi Everyone,
> > >
> > > thanks to Guowei for publishing the FLIP, and thanks Steven for the
> very
> > > thoughtful email!
> > >
> > > We thought a lot internally about some of the questions you posted but
> > > left a lot (almost all) of the implementation details out of the FLIP
> > > for now because we wanted to focus on semantics and API. I'll try and
> > > address the points below.
> > >
> > > ## Initial Scope of the new Sink API
> > >
> > > We need to accept some initial scope that we want to achieve for Flink
> > > 1.12. I don't think we can try and find the solution that will work for
> > > all current and future external systems. For me, the initial goal would
> > > be to produce a Sink API and implementations for systems where you can
> > > prepare "committables" in one process and commit those from another
> > > process. Those are systems that support "real" transactions as you need
> > > them in a two-phase commit protocol. This includes:
> > >
> > >   - File Sink, including HDFS, S3 via special part-file uploads
> > >   - Iceberg
> > >   - HDFS
> > >
> > > The work should include runtime support for both BATCH and STREAMING
> > > execution as outlined in https://s.apache.org/FLIP-134.
> > >
> > > Supporting Kafka already becomes difficult but I'll go into that below.
> > >
> > > ## Where to run the Committer
> > >
> > > Guowei hinted at this in the FLIP: the idea is that the Framework
> > > decides where to run the committer based on requirements and based on
> > > the execution mode (STREAMING or BATCH).
> > >
> > > Something that is not in the FLIP but which we thought about is that we
> > > need to allow different types of committers. I'm currently thinking we
> > > need at least a normal "Committer" and a "GlobalCommiter" (name TBD).
> > >
> > > The Committer is as described in the FLIP, it's basically a function
> > > "void commit(Committable)". The GobalCommitter would be a function
> "void
> > > commit(List<Committable>)". The former would be used by an S3 sink
> where
> > > we can individually commit files to S3, a committable would be the list
> > > of part uploads that will form the final file and the commit operation
> > > creates the metadata in S3. The latter would be used by something like
> > > Iceberg where the Committer needs a global view of all the commits to
> be
> > > efficient and not overwhelm the system.
> > >
> > > I don't know yet if sinks would only implement on type of commit
> > > function or potentially both at the same time, and maybe Commit can
> > > return some CommitResult that gets shipped to the GlobalCommit
> function.
> > >
> > > An interesting read on this topic is the discussion on
> > > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
> > > FileOutputCommitter and the two different available algorithms for
> > > committing final job/task results.
> > >
> > > These interfaces untie the sink implementation from the Runtime and we
> > > could, for example, have a runtime like this:
> > >
> > > ### BATCH
> > >
> > >   - Collect all committables and store them in a fault tolerant way
> > > until the job finishes
> > >   - For a normal Commit function, call it on the individual commits. We
> > > can potentially distribute this if it becomes a bottleneck
> > >   - For GlobalCommit function, call it will all the commits. This
> cannot
> > > be distributed
> > >
> > > We can collect the committables in an OperatorCoordinator or
> potentially
> > > somehow in a task. Though I prefer an OperatorCoordinator right now.
> The
> > > operator coordinator needs to keep the commits in a fault-tolerant way.
> > >
> > > ### STREAMING
> > >
> > >   - For normal Commit, keep the committables in state on the individual
> > > tasks, commit them when a checkpoint completes
> > >   - For global CommitFunction we have options: collect them in a DOP-1
> > > operator in the topology or send them to an OperatorCoordinator to do
> > > the commit there. This is where the source/sink duality that Steven
> > > mentions becomes visible.
> > >
> > > ## Kafka
> > >
> > > Kafka is a problematic case because it doesn't really support
> > > transactions as outlined above. Our current Sink implementations works
> > > around that with hacks but that only gets us so far.
> > >
> > > The problem with Kafka is that we need to aggressively clean up pending
> > > transactions in case a failure happens. Otherwise stale transactions
> > > would block downstream consumers. See here for details:
> > > http://kafka.apache.org/documentation/#isolation.level.
> > >
> > > The way we solve this in the current Kafka sink is by using a fixed
> pool
> > > of transactional IDs and then cancelling all outstanding transactions
> > > for the IDs when we restore from a savepoint. In order for this to work
> > > we need to recycle the IDs, so there needs to be a back-channel from
> the
> > > Committer to the Writter, or they need to share internal state.
> > >
> > > I don't get see a satisfying solution for this so I think we should
> > > exclude this from the initial version.
> > >
> > > ## On Write-Ahead-Log Sinks
> > >
> > > Some sinks, like ES or Cassandra would require that we keep a WAL in
> > > Flink and then ship the contents to the external system on checkpoint.
> > > The reason is that these systems don't support real transactions where
> > > you can prepare them in one process and commit them from another
> process.
> > >
> > > Best,
> > > Aljoscha
> > >
> > >
> > > On 11.09.20 02:23, Steven Wu wrote:
> > >> Guowei,
> > >>
> > >> Thanks a lot for the proposal and starting the discussion thread. Very
> > >> excited.
> > >>
> > >> For the big question of "Is the sink an operator or a topology?", I
> > >> have a
> > >> few related sub questions.
> > >> * Where should we run the committers?
> > >> * Is the committer parallel or single parallelism?
> > >> * Can a single choice satisfy all sinks?
> > >>
> > >> Trying to envision how some sinks can be implemented with this new
> > >> unified
> > >> sink interface.
> > >>
> > >> 1. Kafka sink
> > >>
> > >> Kafka supports non-transactional and transactional writes
> > >> * Non-transaction writes don't need commit action. we can have
> *parallel
> > >> writers and no/no-op committers*. This is probably true for other
> > >> non-transactional message queues.
> > >> * Transaction writes can be implemented as *parallel writers and
> > parallel
> > >> committers*. In this case, I don't know if it makes sense to separate
> > >> writers and committers into two separate operators, because they
> > probably
> > >> need to share the same KafkaProducer object.
> > >>
> > >> Either way, both writers and committers probably should *run inside
> task
> > >> managers*.
> > >>
> > >> 2. ES sink
> > >>
> > >> ES sink typically buffers the data up to a certain size or time
> > threshold
> > >> and then uploads/commits a batch to ES. Writers buffer data and flush
> > >> when
> > >> needed, and committer does the HTTP bulk upload to commit. To avoid
> > >> serialization/deserialization cost, we should run *parallel writers
> and
> > >> parallel committers* and they *should be* *chained or bundled
> together*
> > >> while *running inside task managers*.
> > >>
> > >> It can also be implemented as *parallel writers and no/no-op
> > committers*,
> > >> where all logics (batching and upload) are put inside the writers.
> > >>
> > >> 3. Iceberg [1] sink
> > >>
> > >> It is currently implemented as two-stage operators with *parallel
> > writers
> > >> and single-parallelism committers*.
> > >> * *parallel writers* that write records into data files. Upon
> > checkpoint,
> > >> writers flush and upload the files, and send the metadata/location of
> > the
> > >> data files to the downstream committer. Writers need to do the flush
> > >> inside
> > >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method)
> > >> before
> > >> forwarding the checkpoint barrier to the committer
> > >> * single-parallelism committer operator. It collects data files from
> > >> upstream writers. During "snapshotState", it saves collected data
> > >> files (or
> > >> an uber metadata file) into state. When the checkpoint is completed,
> > >> inside
> > >> "notifyCheckpointComplete" it commits those data files to Iceberg
> > >> tables. *The
> > >> committer has to be single parallelism*, because we don't want
> > >> hundreds or
> > >> thousands of parallel committers to compete for commit operations with
> > >> opportunistic concurrency control. It will be very inefficient and
> > >> probably
> > >> infeasible if the parallelism is high. Too many tiny
> > commits/transactions
> > >> can also slow down both the table write and read paths due to too many
> > >> manifest files.
> > >>
> > >> Right now, both Iceberg writer and committer operators run inside task
> > >> managers. It has one major drawback. With Iceberg sink, embarrassingly
> > >> parallel jobs won't be embarrassingly parallel anymore. That breaks
> the
> > >> benefit of region recovery for embarrassingly parallel DAG.
> > Conceptually,
> > >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27
> > >> Enumerator-Reader source pattern. It will be better *if the committer
> > can
> > >> run inside the job manager* like the SplitEnumerator for the FLIP-27
> > >> source.
> > >>
> > >> -----------------------
> > >> Additional questions regarding the doc/API
> > >> * Any example for the writer shared state
> (Writer#snapshotSharedState)?
> > >> * We allow the case where the writer has no state, right? Meaning
> > WriterS
> > >> can be Void.
> > >>
> > >> [1] https://iceberg.apache.org/
> > >>
> > >> Thanks,
> > >> Steven
> > >>
> > >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]>
> wrote:
> > >>
> > >>> Hi, devs & users
> > >>>
> > >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in
> > >>> favor
> > >>> of DataStream API and Table API. Users should be able to use
> > >>> DataStream API
> > >>> to write jobs that support both bounded and unbounded execution
> modes.
> > >>> However, Flink does not provide a sink API to guarantee the
> > Exactly-once
> > >>> semantics in both bounded and unbounded scenarios, which blocks the
> > >>> unification.
> > >>>
> > >>> So we want to introduce a new unified sink API which could let the
> user
> > >>> develop the sink once and run it everywhere. You could find more
> > >>> details in
> > >>> FLIP-143[2].
> > >>>
> > >>> The FLIP contains some open questions that I'd really appreciate
> inputs
> > >>> from the community. Some of the open questions include:
> > >>>
> > >>>     1. We provide two alternative Sink API in the FLIP. The only
> > >>>     difference between the two versions is how to expose the state to
> > >>> the user.
> > >>>     We want to know which one is your preference?
> > >>>     2. How does the sink API support to write to the Hive?
> > >>>     3. Is the sink an operator or a topology?
> > >>>
> > >>> [1]
> > >>>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> > >>>
> > >>> [2]
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> > >>>
> > >>>
> > >>> Best,
> > >>> Guowei
> > >>>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
In reply to this post by Steven Wu
On 14.09.20 01:23, Steven Wu wrote:
> ## Writer interface
>
> For the Writer interface, should we add "*prepareSnapshot"* before the
> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> would the framework call "*flush*" before the barrier emitted downstream?
> that guarantee would achieve the same goal.

I would think that we only need flush() and the semantics are that it
prepares for a commit, so on a physical level it would be called from
"prepareSnapshotPreBarrier". Now that I'm thinking about it more I think
flush() should be renamed to something like "prepareCommit()".

@Guowei, what do you think about this?

> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> tuple to the committer. The committer needs checkpointId to separate out
> data files for different checkpoints if concurrent checkpoints are enabled.

When can this happen? Even with concurrent checkpoints the snapshot
barriers would still cleanly segregate the input stream of an operator
into tranches that should manifest in only one checkpoint. With
concurrent checkpoints, all that can happen is that we start a
checkpoint before a last one is confirmed completed.

Unless there is some weirdness in the sources and some sources start
chk1 first and some other ones start chk2 first?

@Piotrek, do you think this is a problem?

> For the Committer interface, I am wondering if we should split the single
> commit method into separate "*collect"* and "*commit"* methods? This way,
> it can handle both single and multiple CommT objects.

I think we can't do this. If the sink only needs a regular Commiter, we
can perform the commits in parallel, possibly on different machines.
Only when the sink needs a GlobalCommitter would we need to ship all
commits to a single process and perform the commit there. If both
methods were unified in one interface we couldn't make the decision of
were to commit in the framework code.

> For Iceberg, writers don't need any state. But the GlobalCommitter needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint cycle

You could have a point here. Is the code for this available in
open-source? I was checking out
https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java 
and didn't find the ManifestFile optimization there.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
I thought about this some more. One of the important parts of the
Iceberg sink is to know whether we have already committed some
DataFiles. Currently, this is implemented by writing a (JobId,
MaxCheckpointId) tuple to the Iceberg table when committing. When
restoring from a failure we check this and discard committables
(DataFile) that we know to already be committed.

I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.

I think we could fix this by having Flink provide a nonce to the
GlobalCommitter where Flink guarantees that this nonce is unique and
will not change for repeated invocations of the GlobalCommitter with the
same set of committables. The GlobalCommitter could use this to
determine whether a set of committables has already been committed to
the Iceberg table.

It's seems very tailor-made for Iceberg for now but other systems should
suffer from the same problem.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

dwysakowicz
In reply to this post by Aljoscha Krettek-2
Hi all,

> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".

Generally speaking it is a good point that emitting the committables
should happen before emitting the checkpoint barrier downstream.
However, if I remember offline discussions well, the idea behind
Writer#flush and Writer#snapshotState was to differentiate commit on
checkpoint vs final checkpoint at the end of the job. Both of these
methods could emit committables, but the flush should not leave any in
progress state (e.g. in case of file sink in STREAM mode, in
snapshotState it could leave some open files that would be committed in
a subsequent cycle, however flush should close all files). The
snapshotState as it is now can not be called in
prepareSnapshotPreBarrier as it can store some state, which should
happen in Operator#snapshotState as otherwise it would always be
synchronous. Therefore I think we would need sth like:

void prepareCommit(boolean flush, WriterOutput<CommT> output);

ver 1:

List<StateT> snapshotState();

ver 2:

void snapshotState(); // not sure if we need that method at all in option 2

> The Committer is as described in the FLIP, it's basically a function
> "void commit(Committable)". The GobalCommitter would be a function "void
> commit(List<Committable>)". The former would be used by an S3 sink where
> we can individually commit files to S3, a committable would be the list
> of part uploads that will form the final file and the commit operation
> creates the metadata in S3. The latter would be used by something like
> Iceberg where the Committer needs a global view of all the commits to be
> efficient and not overwhelm the system.
>
> I don't know yet if sinks would only implement on type of commit
> function or potentially both at the same time, and maybe Commit can
> return some CommitResult that gets shipped to the GlobalCommit function.
I must admit it I did not get the need for Local/Normal + Global
committer at first. The Iceberg example helped a lot. I think it makes a
lot of sense.

> For Iceberg, writers don't need any state. But the GlobalCommitter
> needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint cycle
> [2].
> ------------------
> StateT snapshotState(SnapshotContext context) throws Exception;
>
> That means we also need the restoreCommitter API in the Sink interface
> ---------------
> Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> state);
I think this might be a valid case. Not sure though if I would go with a
"state" there. Having a state in a committer would imply we need a
collect method as well. So far we needed a single method commit(...) and
the bookkeeping of the committables could be handled by the framework. I
think something like an optional combiner in the GlobalCommitter would
be enough. What do you think?

GlobalCommitter<CommT, GlobalCommT> {

    void commit(GlobalCommT globalCommittables);

    GlobalCommT combine(List<CommT> committables);

}

A different problem that I see here is how do we handle commit failures.
Should the committables (both normal and global be included in the next
cycle, shall we retry it, ...) I think it would be worth laying it out
in the FLIP.

@Aljoscha I think you can find the code Steven was referring in here:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java

Best,

Dawid

On 14/09/2020 15:19, Aljoscha Krettek wrote:

> On 14.09.20 01:23, Steven Wu wrote:
>> ## Writer interface
>>
>> For the Writer interface, should we add "*prepareSnapshot"* before the
>> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
>> would the framework call "*flush*" before the barrier emitted
>> downstream?
>> that guarantee would achieve the same goal.
>
> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".
>
> @Guowei, what do you think about this?
>
>> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
>> tuple to the committer. The committer needs checkpointId to separate out
>> data files for different checkpoints if concurrent checkpoints are
>> enabled.
>
> When can this happen? Even with concurrent checkpoints the snapshot
> barriers would still cleanly segregate the input stream of an operator
> into tranches that should manifest in only one checkpoint. With
> concurrent checkpoints, all that can happen is that we start a
> checkpoint before a last one is confirmed completed.
>
> Unless there is some weirdness in the sources and some sources start
> chk1 first and some other ones start chk2 first?
>
> @Piotrek, do you think this is a problem?
>
>> For the Committer interface, I am wondering if we should split the
>> single
>> commit method into separate "*collect"* and "*commit"* methods? This
>> way,
>> it can handle both single and multiple CommT objects.
>
> I think we can't do this. If the sink only needs a regular Commiter,
> we can perform the commits in parallel, possibly on different
> machines. Only when the sink needs a GlobalCommitter would we need to
> ship all commits to a single process and perform the commit there. If
> both methods were unified in one interface we couldn't make the
> decision of were to commit in the framework code.
>
>> For Iceberg, writers don't need any state. But the GlobalCommitter
>> needs to
>> checkpoint StateT. For the committer, CommT is "DataFile". Since a
>> single
>> committer can collect thousands (or more) data files in one checkpoint
>> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
>> collected thousands data files) as StateT. This allows us to absorb
>> extended commit outages without losing written/uploaded data files, as
>> operator state size is as small as one manifest file per checkpoint
>> cycle
>
> You could have a point here. Is the code for this available in
> open-source? I was checking out
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> and didn't find the ManifestFile optimization there.
>
> Best,
> Aljoscha
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi all,


Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.

It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look like, I would like to first
summarize the different topologies we have mentioned so that we could sync
on the same page and gain more insights about this issue. There are four
types of topology I could see. Please correct me if I misunderstand what
you mean:

   1.

   Commit individual files. (StreamingFileSink)
   1.

      FileWriter -> FileCommitter
      2.

   Commit a directory (HiveSink)
   1.

      FileWriter -> FileCommitter -> GlobalCommitter
      3.

   Commit a bundle of files (Iceberg)
   1.

      DataFileWriter  -> GlobalCommitter
      4.

   Commit a directory with merged files(Some user want to merge the files
   in a directory before committing the directory to Hive meta store)
   1.

      FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter


It can be seen from the above that the topologies are different according
to different requirements. Not only that there may be other options for the
second and third categories. E.g

A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.


In the long run I think we might provide the sink developer the ability to
build arbitrary topologies. Maybe Flink could only provide a basic commit
transformation and let the user build other parts of the topology. In the
1.12 we might first provide different patterns for these different
scenarios at first and I think these components could be reused in the
future.

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > On 14.09.20 01:23, Steven Wu wrote:
> >> ## Writer interface
> >>
> >> For the Writer interface, should we add "*prepareSnapshot"* before the
> >> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> >> would the framework call "*flush*" before the barrier emitted
> >> downstream?
> >> that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> >> tuple to the committer. The committer needs checkpointId to separate out
> >> data files for different checkpoints if concurrent checkpoints are
> >> enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >> For the Committer interface, I am wondering if we should split the
> >> single
> >> commit method into separate "*collect"* and "*commit"* methods? This
> >> way,
> >> it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >> For Iceberg, writers don't need any state. But the GlobalCommitter
> >> needs to
> >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> >> single
> >> committer can collect thousands (or more) data files in one checkpoint
> >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> >> collected thousands data files) as StateT. This allows us to absorb
> >> extended commit outages without losing written/uploaded data files, as
> >> operator state size is as small as one manifest file per checkpoint
> >> cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu

## concurrent checkpoints

[hidden email] regarding the concurrent checkpoints, let me illustrate with a simple DAG below. 


Let's assume each writer emits one file per checkpoint cycle and writer-2 is slow. Now let's look at what the global committer receives 

timeline:            ----------------------------------------------------------> Now
from Writer-1:      file-1-1, ck-1, file-1-2, ck-2
from Writer-2:                                                      file-2-1, ck-1

In this case, the committer shouldn't include "file-1-2" into the commit for ck-1.

## Committable bookkeeping and combining

I like David's proposal where the framework takes care of the bookkeeping of committables and provides a combiner API (CommT -> GlobalCommT) for GlobalCommitter. The only requirement is to tie the commit/CommT/GlobalCommT to a checkpoint. 

When a commit is successful for checkpoint-N, the framework needs to remove the GlobalCommT from the state corresponding to checkpoints <= N. If a commit fails, the GlobalCommT accumulates and will be included in the next cycle. That is how the Iceberg sink works. I think it is good to piggyback retries with Flink's periodic checkpoints for Iceberg sink. Otherwise, it can get complicated to implement retry logic that won't interfere with Flink checkpoints.

The main question is if this pattern is generic to be put into the sink framework or not.

> A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.

[hidden email] I would favor David's suggestion of "combine" API rather than a separate "Agg" operator.

## Using checkpointId

> I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.

checkpointId can work if it is monotonically increasing, which I believe is the case for Flink today. Restoring from checkpoint or savepoint will resume the checkpointIds. 

We can deal with JobID change by saving it into the state and Iceberg snapshot metadata. There is already a PR [1] for that.

## Nonce

> Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique 

That is actually how we implemented internally. Flink Iceberg sink basically hashes the Manifest file location as the nonce. Since the Flink generated Manifest file location is unique, it  guarantees the nonce is unique. 

IMO, checkpointId is also one way of implementing Nonce based on today's Flink behavior.

> and will not change for repeated invocations of the GlobalCommitter with the same set of committables

 if the same set of committables are combined into one GlobalCommT (like ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT interface. 

BTW, as David pointed out, the ManifestFile optimization is only in our internal implementation [2] right now. For the open source version, there is a github issue [3] to track follow-up improvements.

Thanks,
Steven



On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote:
Hi all,


Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.

It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look like, I would like to first
summarize the different topologies we have mentioned so that we could sync
on the same page and gain more insights about this issue. There are four
types of topology I could see. Please correct me if I misunderstand what
you mean:

   1.

   Commit individual files. (StreamingFileSink)
   1.

      FileWriter -> FileCommitter
      2.

   Commit a directory (HiveSink)
   1.

      FileWriter -> FileCommitter -> GlobalCommitter
      3.

   Commit a bundle of files (Iceberg)
   1.

      DataFileWriter  -> GlobalCommitter
      4.

   Commit a directory with merged files(Some user want to merge the files
   in a directory before committing the directory to Hive meta store)
   1.

      FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter


It can be seen from the above that the topologies are different according
to different requirements. Not only that there may be other options for the
second and third categories. E.g

A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.


In the long run I think we might provide the sink developer the ability to
build arbitrary topologies. Maybe Flink could only provide a basic commit
transformation and let the user build other parts of the topology. In the
1.12 we might first provide different patterns for these different
scenarios at first and I think these components could be reused in the
future.

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > On 14.09.20 01:23, Steven Wu wrote:
> >> ## Writer interface
> >>
> >> For the Writer interface, should we add "*prepareSnapshot"* before the
> >> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> >> would the framework call "*flush*" before the barrier emitted
> >> downstream?
> >> that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> >> tuple to the committer. The committer needs checkpointId to separate out
> >> data files for different checkpoints if concurrent checkpoints are
> >> enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >> For the Committer interface, I am wondering if we should split the
> >> single
> >> commit method into separate "*collect"* and "*commit"* methods? This
> >> way,
> >> it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >> For Iceberg, writers don't need any state. But the GlobalCommitter
> >> needs to
> >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> >> single
> >> committer can collect thousands (or more) data files in one checkpoint
> >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> >> collected thousands data files) as StateT. This allows us to absorb
> >> extended commit outages without losing written/uploaded data files, as
> >> operator state size is as small as one manifest file per checkpoint
> >> cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
## Concurrent checkpoints
AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode.

## Committable bookkeeping and combining

I agree with you that the "CombineGlobalCommitter" would work. But we put more optimization logic in the committer, which will make the committer more and more complicated, and eventually become the same as the Writer. For example, The committer needs to clean up some unused manifest file when restoring from a failure if we introduce the optimizations to the committer. 

In this case another alternative might be to put this "merging" optimization to a separate agg operator(maybe just like another `Writer`?). The agg could produce an aggregated committable to the committer. The agg operator could manage the whole life cycle of the manifest file it created. It would make the committer have single responsibility.

>>The main question is if this pattern is generic to be put into the sink framework or not.
Maybe I am wrong. But what I can feel from the current discussion is that different requirements have different topological requirements. 

## Using checkpointId
In the batch execution mode there would be no normal checkpoint any more. That is why we do not introduce the checkpoint id in the API. So it is a great thing that sink decouples its implementation from checkpointid. :)

Best,
Guowei


On Tue, Sep 15, 2020 at 7:33 AM Steven Wu <[hidden email]> wrote:

## concurrent checkpoints

[hidden email] regarding the concurrent checkpoints, let me illustrate with a simple DAG below. 


Let's assume each writer emits one file per checkpoint cycle and writer-2 is slow. Now let's look at what the global committer receives 

timeline:            ----------------------------------------------------------> Now
from Writer-1:      file-1-1, ck-1, file-1-2, ck-2
from Writer-2:                                                      file-2-1, ck-1

In this case, the committer shouldn't include "file-1-2" into the commit for ck-1.

## Committable bookkeeping and combining

I like David's proposal where the framework takes care of the bookkeeping of committables and provides a combiner API (CommT -> GlobalCommT) for GlobalCommitter. The only requirement is to tie the commit/CommT/GlobalCommT to a checkpoint. 

When a commit is successful for checkpoint-N, the framework needs to remove the GlobalCommT from the state corresponding to checkpoints <= N. If a commit fails, the GlobalCommT accumulates and will be included in the next cycle. That is how the Iceberg sink works. I think it is good to piggyback retries with Flink's periodic checkpoints for Iceberg sink. Otherwise, it can get complicated to implement retry logic that won't interfere with Flink checkpoints.

The main question is if this pattern is generic to be put into the sink framework or not.

> A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.

[hidden email] I would favor David's suggestion of "combine" API rather than a separate "Agg" operator.

## Using checkpointId

> I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.

checkpointId can work if it is monotonically increasing, which I believe is the case for Flink today. Restoring from checkpoint or savepoint will resume the checkpointIds. 

We can deal with JobID change by saving it into the state and Iceberg snapshot metadata. There is already a PR [1] for that.

## Nonce

> Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique 

That is actually how we implemented internally. Flink Iceberg sink basically hashes the Manifest file location as the nonce. Since the Flink generated Manifest file location is unique, it  guarantees the nonce is unique. 

IMO, checkpointId is also one way of implementing Nonce based on today's Flink behavior.

> and will not change for repeated invocations of the GlobalCommitter with the same set of committables

 if the same set of committables are combined into one GlobalCommT (like ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT interface. 

BTW, as David pointed out, the ManifestFile optimization is only in our internal implementation [2] right now. For the open source version, there is a github issue [3] to track follow-up improvements.

Thanks,
Steven



On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote:
Hi all,


Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.

It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look like, I would like to first
summarize the different topologies we have mentioned so that we could sync
on the same page and gain more insights about this issue. There are four
types of topology I could see. Please correct me if I misunderstand what
you mean:

   1.

   Commit individual files. (StreamingFileSink)
   1.

      FileWriter -> FileCommitter
      2.

   Commit a directory (HiveSink)
   1.

      FileWriter -> FileCommitter -> GlobalCommitter
      3.

   Commit a bundle of files (Iceberg)
   1.

      DataFileWriter  -> GlobalCommitter
      4.

   Commit a directory with merged files(Some user want to merge the files
   in a directory before committing the directory to Hive meta store)
   1.

      FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter


It can be seen from the above that the topologies are different according
to different requirements. Not only that there may be other options for the
second and third categories. E.g

A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.


In the long run I think we might provide the sink developer the ability to
build arbitrary topologies. Maybe Flink could only provide a basic commit
transformation and let the user build other parts of the topology. In the
1.12 we might first provide different patterns for these different
scenarios at first and I think these components could be reused in the
future.

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > On 14.09.20 01:23, Steven Wu wrote:
> >> ## Writer interface
> >>
> >> For the Writer interface, should we add "*prepareSnapshot"* before the
> >> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> >> would the framework call "*flush*" before the barrier emitted
> >> downstream?
> >> that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> >> tuple to the committer. The committer needs checkpointId to separate out
> >> data files for different checkpoints if concurrent checkpoints are
> >> enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >> For the Committer interface, I am wondering if we should split the
> >> single
> >> commit method into separate "*collect"* and "*commit"* methods? This
> >> way,
> >> it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >> For Iceberg, writers don't need any state. But the GlobalCommitter
> >> needs to
> >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> >> single
> >> committer can collect thousands (or more) data files in one checkpoint
> >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> >> collected thousands data files) as StateT. This allows us to absorb
> >> extended commit outages without losing written/uploaded data files, as
> >> operator state size is as small as one manifest file per checkpoint
> >> cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, aljoscha

>I don't understand why we need the "Drain and Snapshot" section. It
>seems to be some details about stop-with-savepoint and drain, and the
>relation to BATCH execution but I don't know if it is needed to
>understand the rest of the document. I'm happy to be wrong here, though,
>if there's good reasons for the section.

The new unified sink API should provide a way for the sink developer to deal with EOI(Drain) to guarantee the Exactly-once semantics. This is what I want to say mostly in this section. Current streaming style sink API does not provide a good way to deal with it. It is why the `StreamingFileSink` does not commit the last part of data in the bounded scenario. Our theme is unified. I am afraid that I will let users misunderstand that adding this requirement to the new sink API is only for bounded scenarios, so I explained in this paragraph that stop-with-savepoint might also have the similar requirement.

For the snapshot I also want to prevent users from misunderstanding that it is specially prepared for the unbounded scenario. Actually it might be also possible with bounded + batch execution mode in the future.

I could reorganize the section if this section makes the reader confused but I think we might need to keep the drain at least. WDYT?

>On the question of Alternative 1 and 2, I have a strong preference for
>Alternative 1 because we could avoid strong coupling to other modules.
>With Alternative 2 we would depend on `flink-streaming-java` and even
>`flink-runtime`. For the new source API (FLIP-27) we managed to keep the
>dependencies slim and the code is in flink-core. I'd be very happy if we
>can manage the same for the new sink API.

I am open to alternative 1. Maybe I miss something but I do not get why the second alternative would depend on `flink-runtime` or `flink-streaming-java`. The all the state api currently is in the flink-core. Could you give some further explanation?  thanks :)

Best,
Guowei


On Tue, Sep 15, 2020 at 12:05 PM Guowei Ma <[hidden email]> wrote:
## Concurrent checkpoints
AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode.

## Committable bookkeeping and combining

I agree with you that the "CombineGlobalCommitter" would work. But we put more optimization logic in the committer, which will make the committer more and more complicated, and eventually become the same as the Writer. For example, The committer needs to clean up some unused manifest file when restoring from a failure if we introduce the optimizations to the committer. 

In this case another alternative might be to put this "merging" optimization to a separate agg operator(maybe just like another `Writer`?). The agg could produce an aggregated committable to the committer. The agg operator could manage the whole life cycle of the manifest file it created. It would make the committer have single responsibility.

>>The main question is if this pattern is generic to be put into the sink framework or not.
Maybe I am wrong. But what I can feel from the current discussion is that different requirements have different topological requirements. 

## Using checkpointId
In the batch execution mode there would be no normal checkpoint any more. That is why we do not introduce the checkpoint id in the API. So it is a great thing that sink decouples its implementation from checkpointid. :)

Best,
Guowei


On Tue, Sep 15, 2020 at 7:33 AM Steven Wu <[hidden email]> wrote:

## concurrent checkpoints

[hidden email] regarding the concurrent checkpoints, let me illustrate with a simple DAG below. 


Let's assume each writer emits one file per checkpoint cycle and writer-2 is slow. Now let's look at what the global committer receives 

timeline:            ----------------------------------------------------------> Now
from Writer-1:      file-1-1, ck-1, file-1-2, ck-2
from Writer-2:                                                      file-2-1, ck-1

In this case, the committer shouldn't include "file-1-2" into the commit for ck-1.

## Committable bookkeeping and combining

I like David's proposal where the framework takes care of the bookkeeping of committables and provides a combiner API (CommT -> GlobalCommT) for GlobalCommitter. The only requirement is to tie the commit/CommT/GlobalCommT to a checkpoint. 

When a commit is successful for checkpoint-N, the framework needs to remove the GlobalCommT from the state corresponding to checkpoints <= N. If a commit fails, the GlobalCommT accumulates and will be included in the next cycle. That is how the Iceberg sink works. I think it is good to piggyback retries with Flink's periodic checkpoints for Iceberg sink. Otherwise, it can get complicated to implement retry logic that won't interfere with Flink checkpoints.

The main question is if this pattern is generic to be put into the sink framework or not.

> A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.

[hidden email] I would favor David's suggestion of "combine" API rather than a separate "Agg" operator.

## Using checkpointId

> I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.

checkpointId can work if it is monotonically increasing, which I believe is the case for Flink today. Restoring from checkpoint or savepoint will resume the checkpointIds. 

We can deal with JobID change by saving it into the state and Iceberg snapshot metadata. There is already a PR [1] for that.

## Nonce

> Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique 

That is actually how we implemented internally. Flink Iceberg sink basically hashes the Manifest file location as the nonce. Since the Flink generated Manifest file location is unique, it  guarantees the nonce is unique. 

IMO, checkpointId is also one way of implementing Nonce based on today's Flink behavior.

> and will not change for repeated invocations of the GlobalCommitter with the same set of committables

 if the same set of committables are combined into one GlobalCommT (like ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT interface. 

BTW, as David pointed out, the ManifestFile optimization is only in our internal implementation [2] right now. For the open source version, there is a github issue [3] to track follow-up improvements.

Thanks,
Steven



On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote:
Hi all,


Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.

It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look like, I would like to first
summarize the different topologies we have mentioned so that we could sync
on the same page and gain more insights about this issue. There are four
types of topology I could see. Please correct me if I misunderstand what
you mean:

   1.

   Commit individual files. (StreamingFileSink)
   1.

      FileWriter -> FileCommitter
      2.

   Commit a directory (HiveSink)
   1.

      FileWriter -> FileCommitter -> GlobalCommitter
      3.

   Commit a bundle of files (Iceberg)
   1.

      DataFileWriter  -> GlobalCommitter
      4.

   Commit a directory with merged files(Some user want to merge the files
   in a directory before committing the directory to Hive meta store)
   1.

      FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter


It can be seen from the above that the topologies are different according
to different requirements. Not only that there may be other options for the
second and third categories. E.g

A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.


In the long run I think we might provide the sink developer the ability to
build arbitrary topologies. Maybe Flink could only provide a basic commit
transformation and let the user build other parts of the topology. In the
1.12 we might first provide different patterns for these different
scenarios at first and I think these components could be reused in the
future.

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > On 14.09.20 01:23, Steven Wu wrote:
> >> ## Writer interface
> >>
> >> For the Writer interface, should we add "*prepareSnapshot"* before the
> >> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> >> would the framework call "*flush*" before the barrier emitted
> >> downstream?
> >> that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> >> tuple to the committer. The committer needs checkpointId to separate out
> >> data files for different checkpoints if concurrent checkpoints are
> >> enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >> For the Committer interface, I am wondering if we should split the
> >> single
> >> commit method into separate "*collect"* and "*commit"* methods? This
> >> way,
> >> it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >> For Iceberg, writers don't need any state. But the GlobalCommitter
> >> needs to
> >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> >> single
> >> committer can collect thousands (or more) data files in one checkpoint
> >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> >> collected thousands data files) as StateT. This allows us to absorb
> >> extended commit outages without losing written/uploaded data files, as
> >> operator state size is as small as one manifest file per checkpoint
> >> cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
In reply to this post by dwysakowicz
>> I would think that we only need flush() and the semantics are that it
>> prepares for a commit, so on a physical level it would be called from
>> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> think flush() should be renamed to something like "prepareCommit()".

> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:

> void prepareCommit(boolean flush, WriterOutput<CommT> output);

> ver 1:

> List<StateT> snapshotState();

> ver 2:

> void snapshotState(); // not sure if we need that method at all in option
2

I second Dawid's proposal. This is a valid scenario. And version2 does not
need the snapshotState() any more.

>> The Committer is as described in the FLIP, it's basically a function
>> "void commit(Committable)". The GobalCommitter would be a function "void
>> commit(List<Committable>)". The former would be used by an S3 sink where
>> we can individually commit files to S3, a committable would be the list
>> of part uploads that will form the final file and the commit operation
>> creates the metadata in S3. The latter would be used by something like
>> Iceberg where the Committer needs a global view of all the commits to be
>> efficient and not overwhelm the system.
>>
>> I don't know yet if sinks would only implement on type of commit
>> function or potentially both at the same time, and maybe Commit can
>> return some CommitResult that gets shipped to the GlobalCommit function.
>> I must admit it I did not get the need for Local/Normal + Global
>> committer at first. The Iceberg example helped a lot. I think it makes a
>> lot of sense.

@Dawid
What I understand is that HiveSink's implementation might need the local
committer(FileCommitter) because the file rename is needed.
But the iceberg only needs to write the manifest file.  Would you like to
enlighten me why the Iceberg needs the local committer?
Thanks

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > On 14.09.20 01:23, Steven Wu wrote:
> >> ## Writer interface
> >>
> >> For the Writer interface, should we add "*prepareSnapshot"* before the
> >> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> >> would the framework call "*flush*" before the barrier emitted
> >> downstream?
> >> that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> >> tuple to the committer. The committer needs checkpointId to separate out
> >> data files for different checkpoints if concurrent checkpoints are
> >> enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >> For the Committer interface, I am wondering if we should split the
> >> single
> >> commit method into separate "*collect"* and "*commit"* methods? This
> >> way,
> >> it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >> For Iceberg, writers don't need any state. But the GlobalCommitter
> >> needs to
> >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> >> single
> >> committer can collect thousands (or more) data files in one checkpoint
> >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> >> collected thousands data files) as StateT. This allows us to absorb
> >> extended commit outages without losing written/uploaded data files, as
> >> operator state size is as small as one manifest file per checkpoint
> >> cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
In reply to this post by Steven Wu
On 15.09.20 01:33, Steven Wu wrote:
> ## concurrent checkpoints
>
> @Aljoscha Krettek <[hidden email]> regarding the concurrent
> checkpoints, let me illustrate with a simple DAG below.
> [image: image.png]

Hi Steven,

images don't make it through to the mailing lists. You would need to
host the file somewhere and send a link.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Aljoscha Krettek-2
In reply to this post by Guowei Ma
On 15.09.20 06:05, Guowei Ma wrote:
> ## Using checkpointId
> In the batch execution mode there would be no normal checkpoint any more.
> That is why we do not introduce the checkpoint id in the API. So it is a
> great thing that sink decouples its implementation from checkpointid. :)

Yes, this is a very important point of the design!

On 15.09.20 06:28, Guowei Ma wrote:
> I am open to alternative 1. Maybe I miss something but I do not get why the
> second alternative would depend on `flink-runtime` or
> `flink-streaming-java`. The all the state api currently is in the
> flink-core. Could you give some further explanation?  thanks :)

You're right, yes. It seems I was thinking about other things. I also
prefer alternative 1 because it's more "declarative", i.e. the sink has
a fixed method where it can return something. Instead of the framework
giving an interface to the sink that it can then use in arbitrary ways.
Alternative 1 leaves more freedom to the framework but limits the sink.
Alternative 2 leaves more freedom to the sink but limits the framework,
potentially.

Best,
Aljoscha



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

dwysakowicz
In reply to this post by Guowei Ma

What I understand is that HiveSink's implementation might need the local
committer(FileCommitter) because the file rename is needed.
But the iceberg only needs to write the manifest file.  Would you like to
enlighten me why the Iceberg needs the local committer?
Thanks
Sorry if I caused a confusion here. I am not saying the Iceberg sink needs a local committer. What I had in mind is that prior to the Iceberg example I did not see a need for a "GlobalCommitter" in the streaming case. I thought it is always enough to have the "normal" committer in that case. Now I understand that this differentiation is not really about logical separation. It is not really about the granularity with which we commit, i.e. answering the "WHAT" question. It is really about the performance and that in the end we will have a single "transaction", so it is about answering the question "HOW".

  • Commit a directory with merged files(Some user want to merge the files in a directory before committing the directory to Hive meta store)

    1. FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter

    I still find the merging case the most confusing. I don't necessarily understand why do you need the "SingleFileCommit" step in this scenario. The way I understand "commit" operation is that it makes some data/artifacts visible to the external system, thus it should be immutable from a point of view of a single process. Having an additional step in the same process that works on committed data contradicts with those assumptions. I might be missing something though. Could you elaborate why can't it be something like FileWriter -> FileMergeWriter -> Committer (either global or non-global)? Again it might be just me not getting the example.

    I've just briefly skimmed over the proposed interfaces. I would suggest one
    addition to the Writer interface (as I understand this is the runtime
    interface in this proposal?): add some availability method, to avoid, if
    possible, blocking calls on the sink. We already have similar
    availability methods in the new sources [1] and in various places in the
    network stack [2].
    BTW Let's not forget about Piotr's comment. I think we could add the isAvailable or similar method to the Writer interface in the FLIP.

    Best,

    Dawid

    On 15/09/2020 08:06, Guowei Ma wrote:
    I would think that we only need flush() and the semantics are that it
    prepares for a commit, so on a physical level it would be called from
    "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    think flush() should be renamed to something like "prepareCommit()".
    
    
          
    Generally speaking it is a good point that emitting the committables
    should happen before emitting the checkpoint barrier downstream.
    However, if I remember offline discussions well, the idea behind
    Writer#flush and Writer#snapshotState was to differentiate commit on
    checkpoint vs final checkpoint at the end of the job. Both of these
    methods could emit committables, but the flush should not leave any in
    progress state (e.g. in case of file sink in STREAM mode, in
    snapshotState it could leave some open files that would be committed in
    a subsequent cycle, however flush should close all files). The
    snapshotState as it is now can not be called in
    prepareSnapshotPreBarrier as it can store some state, which should
    happen in Operator#snapshotState as otherwise it would always be
    synchronous. Therefore I think we would need sth like:
    
    
          
    void prepareCommit(boolean flush, WriterOutput<CommT> output);
    
    
          
    ver 1:
    
    
          
    List<StateT> snapshotState();
    
    
          
    ver 2:
    
    
          
    void snapshotState(); // not sure if we need that method at all in option
    
    2
    
    I second Dawid's proposal. This is a valid scenario. And version2 does not
    need the snapshotState() any more.
    
    
    The Committer is as described in the FLIP, it's basically a function
    "void commit(Committable)". The GobalCommitter would be a function "void
    commit(List<Committable>)". The former would be used by an S3 sink where
    we can individually commit files to S3, a committable would be the list
    of part uploads that will form the final file and the commit operation
    creates the metadata in S3. The latter would be used by something like
    Iceberg where the Committer needs a global view of all the commits to be
    efficient and not overwhelm the system.
    
    I don't know yet if sinks would only implement on type of commit
    function or potentially both at the same time, and maybe Commit can
    return some CommitResult that gets shipped to the GlobalCommit function.
    I must admit it I did not get the need for Local/Normal + Global
    committer at first. The Iceberg example helped a lot. I think it makes a
    lot of sense.
    
    @Dawid
    What I understand is that HiveSink's implementation might need the local
    committer(FileCommitter) because the file rename is needed.
    But the iceberg only needs to write the manifest file.  Would you like to
    enlighten me why the Iceberg needs the local committer?
    Thanks
    
    Best,
    Guowei
    
    
    On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz [hidden email]
    wrote:
    
    
    Hi all,
    
    
    I would think that we only need flush() and the semantics are that it
    prepares for a commit, so on a physical level it would be called from
    "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    think flush() should be renamed to something like "prepareCommit()".
    
    Generally speaking it is a good point that emitting the committables
    should happen before emitting the checkpoint barrier downstream.
    However, if I remember offline discussions well, the idea behind
    Writer#flush and Writer#snapshotState was to differentiate commit on
    checkpoint vs final checkpoint at the end of the job. Both of these
    methods could emit committables, but the flush should not leave any in
    progress state (e.g. in case of file sink in STREAM mode, in
    snapshotState it could leave some open files that would be committed in
    a subsequent cycle, however flush should close all files). The
    snapshotState as it is now can not be called in
    prepareSnapshotPreBarrier as it can store some state, which should
    happen in Operator#snapshotState as otherwise it would always be
    synchronous. Therefore I think we would need sth like:
    
    void prepareCommit(boolean flush, WriterOutput<CommT> output);
    
    ver 1:
    
    List<StateT> snapshotState();
    
    ver 2:
    
    void snapshotState(); // not sure if we need that method at all in option 2
    
    
    The Committer is as described in the FLIP, it's basically a function
    "void commit(Committable)". The GobalCommitter would be a function "void
    commit(List<Committable>)". The former would be used by an S3 sink where
    we can individually commit files to S3, a committable would be the list
    of part uploads that will form the final file and the commit operation
    creates the metadata in S3. The latter would be used by something like
    Iceberg where the Committer needs a global view of all the commits to be
    efficient and not overwhelm the system.
    
    I don't know yet if sinks would only implement on type of commit
    function or potentially both at the same time, and maybe Commit can
    return some CommitResult that gets shipped to the GlobalCommit function.
    
    I must admit it I did not get the need for Local/Normal + Global
    committer at first. The Iceberg example helped a lot. I think it makes a
    lot of sense.
    
    
    For Iceberg, writers don't need any state. But the GlobalCommitter
    needs to
    checkpoint StateT. For the committer, CommT is "DataFile". Since a single
    committer can collect thousands (or more) data files in one checkpoint
    cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    collected thousands data files) as StateT. This allows us to absorb
    extended commit outages without losing written/uploaded data files, as
    operator state size is as small as one manifest file per checkpoint cycle
    [2].
    ------------------
    StateT snapshotState(SnapshotContext context) throws Exception;
    
    That means we also need the restoreCommitter API in the Sink interface
    ---------------
    Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
    state);
    
    I think this might be a valid case. Not sure though if I would go with a
    "state" there. Having a state in a committer would imply we need a
    collect method as well. So far we needed a single method commit(...) and
    the bookkeeping of the committables could be handled by the framework. I
    think something like an optional combiner in the GlobalCommitter would
    be enough. What do you think?
    
    GlobalCommitter<CommT, GlobalCommT> {
    
        void commit(GlobalCommT globalCommittables);
    
        GlobalCommT combine(List<CommT> committables);
    
    }
    
    A different problem that I see here is how do we handle commit failures.
    Should the committables (both normal and global be included in the next
    cycle, shall we retry it, ...) I think it would be worth laying it out
    in the FLIP.
    
    @Aljoscha I think you can find the code Steven was referring in here:
    
    https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
    
    Best,
    
    Dawid
    
    On 14/09/2020 15:19, Aljoscha Krettek wrote:
    
    On 14.09.20 01:23, Steven Wu wrote:
    
    ## Writer interface
    
    For the Writer interface, should we add "*prepareSnapshot"* before the
    checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
    would the framework call "*flush*" before the barrier emitted
    downstream?
    that guarantee would achieve the same goal.
    
    I would think that we only need flush() and the semantics are that it
    prepares for a commit, so on a physical level it would be called from
    "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    think flush() should be renamed to something like "prepareCommit()".
    
    @Guowei, what do you think about this?
    
    
    In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
    tuple to the committer. The committer needs checkpointId to separate out
    data files for different checkpoints if concurrent checkpoints are
    enabled.
    
    When can this happen? Even with concurrent checkpoints the snapshot
    barriers would still cleanly segregate the input stream of an operator
    into tranches that should manifest in only one checkpoint. With
    concurrent checkpoints, all that can happen is that we start a
    checkpoint before a last one is confirmed completed.
    
    Unless there is some weirdness in the sources and some sources start
    chk1 first and some other ones start chk2 first?
    
    @Piotrek, do you think this is a problem?
    
    
    For the Committer interface, I am wondering if we should split the
    single
    commit method into separate "*collect"* and "*commit"* methods? This
    way,
    it can handle both single and multiple CommT objects.
    
    I think we can't do this. If the sink only needs a regular Commiter,
    we can perform the commits in parallel, possibly on different
    machines. Only when the sink needs a GlobalCommitter would we need to
    ship all commits to a single process and perform the commit there. If
    both methods were unified in one interface we couldn't make the
    decision of were to commit in the framework code.
    
    
    For Iceberg, writers don't need any state. But the GlobalCommitter
    needs to
    checkpoint StateT. For the committer, CommT is "DataFile". Since a
    single
    committer can collect thousands (or more) data files in one checkpoint
    cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    collected thousands data files) as StateT. This allows us to absorb
    extended commit outages without losing written/uploaded data files, as
    operator state size is as small as one manifest file per checkpoint
    cycle
    
    You could have a point here. Is the code for this available in
    open-source? I was checking out
    
    
    https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
    
    and didn't find the ManifestFile optimization there.
    
    Best,
    Aljoscha
    
    
    
    
    
        

    signature.asc (849 bytes) Download Attachment
    Reply | Threaded
    Open this post in threaded view
    |

    Re: [DISCUSS] FLIP-143: Unified Sink API

    Aljoscha Krettek-2
    On 15.09.20 09:55, Dawid Wysakowicz wrote:
    > BTW Let's not forget about Piotr's comment. I think we could add the
    > isAvailable or similar method to the Writer interface in the FLIP.

    I'm not so sure about this, the sinks I'm aware of would not be able to
    implement this method: Kafka doesn't have this, I didn't see it in the
    Iceberg interfaces, and HDFS/S3 also don't have it.

    I can see why from a systems perspective it would be useful to have, of
    course.

    Best,
    Aljoscha
    Reply | Threaded
    Open this post in threaded view
    |

    Re: [DISCUSS] FLIP-143: Unified Sink API

    Guowei Ma
    In reply to this post by dwysakowicz
    Hi, Dawid

    >>I still find the merging case the most confusing. I don't necessarily
    understand why do you need the "SingleFileCommit" step in this scenario.
    The way I
    >> understand "commit" operation is that it makes some data/artifacts
    visible to the external system, thus it should be immutable from a point of
    view of a single >>process. Having an additional step in the same process
    that works on committed data contradicts with those assumptions. I might be
    missing something though. >> Could you elaborate >why can't it be something
    like FileWriter -> FileMergeWriter -> Committer (either global or
    non-global)? Again it might be just me not getting the example.

    I think you are right. The topology
    "FileWriter->FileMergeWriter->Committer" could meet the merge requirement.
    The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
    GlobalCommitter" reuses some code of the StreamingFileSink(For example
    rolling policy) so it has the "SingleFileCommitter" in the topology. In
    general I want to use the case to show that there are different topologies
    according to the requirements.

    BTW: IIRC, @Jingsong Lee <[hidden email]> telled me that the
    actual topology of merged supported HiveSink is more complicated than that.


    >> I've just briefly skimmed over the proposed interfaces. I would suggest
    one
    >> addition to the Writer interface (as I understand this is the runtime
    >> interface in this proposal?): add some availability method, to avoid, if
    >> possible, blocking calls on the sink. We already have similar
    >> availability methods in the new sources [1] and in various places in the
    >> network stack [2].
    >> BTW Let's not forget about Piotr's comment. I think we could add the
    isAvailable or similar method to the Writer interface in the FLIP.

    Thanks @Dawid Wysakowicz <[hidden email]>  for your reminder. There
    are two many issues at the same time.

    In addition to what Ajjoscha said : there is very little system support
    it.   Another thing I worry about is that: Does the sink's snapshot return
    immediately when the sink's status is unavailable? Maybe we could do it by
    dedupe some element in the state but I think it might be too complicated.
    For me I want to know is what specific sink will benefit from this
    feature.  @piotr <[hidden email]>  Please correct me if  I
    misunderstand you. thanks.

    Best,
    Guowei


    On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <[hidden email]>
    wrote:

    > What I understand is that HiveSink's implementation might need the local
    > committer(FileCommitter) because the file rename is needed.
    > But the iceberg only needs to write the manifest file.  Would you like to
    > enlighten me why the Iceberg needs the local committer?
    > Thanks
    >
    > Sorry if I caused a confusion here. I am not saying the Iceberg sink needs
    > a local committer. What I had in mind is that prior to the Iceberg example
    > I did not see a need for a "GlobalCommitter" in the streaming case. I
    > thought it is always enough to have the "normal" committer in that case.
    > Now I understand that this differentiation is not really about logical
    > separation. It is not really about the granularity with which we commit,
    > i.e. answering the "WHAT" question. It is really about the performance and
    > that in the end we will have a single "transaction", so it is about
    > answering the question "HOW".
    >
    >
    >    -
    >
    >    Commit a directory with merged files(Some user want to merge the files
    >    in a directory before committing the directory to Hive meta store)
    >
    >
    >    1.
    >
    >    FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
    >
    > I still find the merging case the most confusing. I don't necessarily
    > understand why do you need the "SingleFileCommit" step in this scenario.
    > The way I understand "commit" operation is that it makes some
    > data/artifacts visible to the external system, thus it should be immutable
    > from a point of view of a single process. Having an additional step in the
    > same process that works on committed data contradicts with those
    > assumptions. I might be missing something though. Could you elaborate why
    > can't it be something like FileWriter -> FileMergeWriter -> Committer
    > (either global or non-global)? Again it might be just me not getting the
    > example.
    >
    > I've just briefly skimmed over the proposed interfaces. I would suggest one
    > addition to the Writer interface (as I understand this is the runtime
    > interface in this proposal?): add some availability method, to avoid, if
    > possible, blocking calls on the sink. We already have similar
    > availability methods in the new sources [1] and in various places in the
    > network stack [2].
    >
    > BTW Let's not forget about Piotr's comment. I think we could add the
    > isAvailable or similar method to the Writer interface in the FLIP.
    >
    > Best,
    >
    > Dawid
    > On 15/09/2020 08:06, Guowei Ma wrote:
    >
    > I would think that we only need flush() and the semantics are that it
    > prepares for a commit, so on a physical level it would be called from
    > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > think flush() should be renamed to something like "prepareCommit()".
    >
    > Generally speaking it is a good point that emitting the committables
    > should happen before emitting the checkpoint barrier downstream.
    > However, if I remember offline discussions well, the idea behind
    > Writer#flush and Writer#snapshotState was to differentiate commit on
    > checkpoint vs final checkpoint at the end of the job. Both of these
    > methods could emit committables, but the flush should not leave any in
    > progress state (e.g. in case of file sink in STREAM mode, in
    > snapshotState it could leave some open files that would be committed in
    > a subsequent cycle, however flush should close all files). The
    > snapshotState as it is now can not be called in
    > prepareSnapshotPreBarrier as it can store some state, which should
    > happen in Operator#snapshotState as otherwise it would always be
    > synchronous. Therefore I think we would need sth like:
    >
    > void prepareCommit(boolean flush, WriterOutput<CommT> output);
    >
    > ver 1:
    >
    > List<StateT> snapshotState();
    >
    > ver 2:
    >
    > void snapshotState(); // not sure if we need that method at all in option
    >
    > 2
    >
    > I second Dawid's proposal. This is a valid scenario. And version2 does not
    > need the snapshotState() any more.
    >
    >
    > The Committer is as described in the FLIP, it's basically a function
    > "void commit(Committable)". The GobalCommitter would be a function "void
    > commit(List<Committable>)". The former would be used by an S3 sink where
    > we can individually commit files to S3, a committable would be the list
    > of part uploads that will form the final file and the commit operation
    > creates the metadata in S3. The latter would be used by something like
    > Iceberg where the Committer needs a global view of all the commits to be
    > efficient and not overwhelm the system.
    >
    > I don't know yet if sinks would only implement on type of commit
    > function or potentially both at the same time, and maybe Commit can
    > return some CommitResult that gets shipped to the GlobalCommit function.
    > I must admit it I did not get the need for Local/Normal + Global
    > committer at first. The Iceberg example helped a lot. I think it makes a
    > lot of sense.
    >
    > @Dawid
    > What I understand is that HiveSink's implementation might need the local
    > committer(FileCommitter) because the file rename is needed.
    > But the iceberg only needs to write the manifest file.  Would you like to
    > enlighten me why the Iceberg needs the local committer?
    > Thanks
    >
    > Best,
    > Guowei
    >
    >
    > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]> <[hidden email]>
    > wrote:
    >
    >
    > Hi all,
    >
    >
    > I would think that we only need flush() and the semantics are that it
    > prepares for a commit, so on a physical level it would be called from
    > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > think flush() should be renamed to something like "prepareCommit()".
    >
    > Generally speaking it is a good point that emitting the committables
    > should happen before emitting the checkpoint barrier downstream.
    > However, if I remember offline discussions well, the idea behind
    > Writer#flush and Writer#snapshotState was to differentiate commit on
    > checkpoint vs final checkpoint at the end of the job. Both of these
    > methods could emit committables, but the flush should not leave any in
    > progress state (e.g. in case of file sink in STREAM mode, in
    > snapshotState it could leave some open files that would be committed in
    > a subsequent cycle, however flush should close all files). The
    > snapshotState as it is now can not be called in
    > prepareSnapshotPreBarrier as it can store some state, which should
    > happen in Operator#snapshotState as otherwise it would always be
    > synchronous. Therefore I think we would need sth like:
    >
    > void prepareCommit(boolean flush, WriterOutput<CommT> output);
    >
    > ver 1:
    >
    > List<StateT> snapshotState();
    >
    > ver 2:
    >
    > void snapshotState(); // not sure if we need that method at all in option 2
    >
    >
    > The Committer is as described in the FLIP, it's basically a function
    > "void commit(Committable)". The GobalCommitter would be a function "void
    > commit(List<Committable>)". The former would be used by an S3 sink where
    > we can individually commit files to S3, a committable would be the list
    > of part uploads that will form the final file and the commit operation
    > creates the metadata in S3. The latter would be used by something like
    > Iceberg where the Committer needs a global view of all the commits to be
    > efficient and not overwhelm the system.
    >
    > I don't know yet if sinks would only implement on type of commit
    > function or potentially both at the same time, and maybe Commit can
    > return some CommitResult that gets shipped to the GlobalCommit function.
    >
    > I must admit it I did not get the need for Local/Normal + Global
    > committer at first. The Iceberg example helped a lot. I think it makes a
    > lot of sense.
    >
    >
    > For Iceberg, writers don't need any state. But the GlobalCommitter
    > needs to
    > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
    > committer can collect thousands (or more) data files in one checkpoint
    > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    > collected thousands data files) as StateT. This allows us to absorb
    > extended commit outages without losing written/uploaded data files, as
    > operator state size is as small as one manifest file per checkpoint cycle
    > [2].
    > ------------------
    > StateT snapshotState(SnapshotContext context) throws Exception;
    >
    > That means we also need the restoreCommitter API in the Sink interface
    > ---------------
    > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
    > state);
    >
    > I think this might be a valid case. Not sure though if I would go with a
    > "state" there. Having a state in a committer would imply we need a
    > collect method as well. So far we needed a single method commit(...) and
    > the bookkeeping of the committables could be handled by the framework. I
    > think something like an optional combiner in the GlobalCommitter would
    > be enough. What do you think?
    >
    > GlobalCommitter<CommT, GlobalCommT> {
    >
    >     void commit(GlobalCommT globalCommittables);
    >
    >     GlobalCommT combine(List<CommT> committables);
    >
    > }
    >
    > A different problem that I see here is how do we handle commit failures.
    > Should the committables (both normal and global be included in the next
    > cycle, shall we retry it, ...) I think it would be worth laying it out
    > in the FLIP.
    >
    > @Aljoscha I think you can find the code Steven was referring in here:
    > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
    >
    > Best,
    >
    > Dawid
    >
    > On 14/09/2020 15:19, Aljoscha Krettek wrote:
    >
    > On 14.09.20 01:23, Steven Wu wrote:
    >
    > ## Writer interface
    >
    > For the Writer interface, should we add "*prepareSnapshot"* before the
    > checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
    > would the framework call "*flush*" before the barrier emitted
    > downstream?
    > that guarantee would achieve the same goal.
    >
    > I would think that we only need flush() and the semantics are that it
    > prepares for a commit, so on a physical level it would be called from
    > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > think flush() should be renamed to something like "prepareCommit()".
    >
    > @Guowei, what do you think about this?
    >
    >
    > In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
    > tuple to the committer. The committer needs checkpointId to separate out
    > data files for different checkpoints if concurrent checkpoints are
    > enabled.
    >
    > When can this happen? Even with concurrent checkpoints the snapshot
    > barriers would still cleanly segregate the input stream of an operator
    > into tranches that should manifest in only one checkpoint. With
    > concurrent checkpoints, all that can happen is that we start a
    > checkpoint before a last one is confirmed completed.
    >
    > Unless there is some weirdness in the sources and some sources start
    > chk1 first and some other ones start chk2 first?
    >
    > @Piotrek, do you think this is a problem?
    >
    >
    > For the Committer interface, I am wondering if we should split the
    > single
    > commit method into separate "*collect"* and "*commit"* methods? This
    > way,
    > it can handle both single and multiple CommT objects.
    >
    > I think we can't do this. If the sink only needs a regular Commiter,
    > we can perform the commits in parallel, possibly on different
    > machines. Only when the sink needs a GlobalCommitter would we need to
    > ship all commits to a single process and perform the commit there. If
    > both methods were unified in one interface we couldn't make the
    > decision of were to commit in the framework code.
    >
    >
    > For Iceberg, writers don't need any state. But the GlobalCommitter
    > needs to
    > checkpoint StateT. For the committer, CommT is "DataFile". Since a
    > single
    > committer can collect thousands (or more) data files in one checkpoint
    > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    > collected thousands data files) as StateT. This allows us to absorb
    > extended commit outages without losing written/uploaded data files, as
    > operator state size is as small as one manifest file per checkpoint
    > cycle
    >
    > You could have a point here. Is the code for this available in
    > open-source? I was checking out
    >
    >
    > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
    >
    > and didn't find the ManifestFile optimization there.
    >
    > Best,
    > Aljoscha
    >
    >
    >
    Reply | Threaded
    Open this post in threaded view
    |

    Re: [DISCUSS] FLIP-143: Unified Sink API

    Steven Wu
    > images don't make it through to the mailing lists. You would need to host
    the file somewhere and send a link.

    Sorry about that. Here is the sample DAG in google drawings.
    https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing


    On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <[hidden email]> wrote:

    > Hi, Dawid
    >
    > >>I still find the merging case the most confusing. I don't necessarily
    > understand why do you need the "SingleFileCommit" step in this scenario.
    > The way I
    > >> understand "commit" operation is that it makes some data/artifacts
    > visible to the external system, thus it should be immutable from a point of
    > view of a single >>process. Having an additional step in the same process
    > that works on committed data contradicts with those assumptions. I might be
    > missing something though. >> Could you elaborate >why can't it be something
    > like FileWriter -> FileMergeWriter -> Committer (either global or
    > non-global)? Again it might be just me not getting the example.
    >
    > I think you are right. The topology
    > "FileWriter->FileMergeWriter->Committer" could meet the merge requirement.
    > The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
    > GlobalCommitter" reuses some code of the StreamingFileSink(For example
    > rolling policy) so it has the "SingleFileCommitter" in the topology. In
    > general I want to use the case to show that there are different topologies
    > according to the requirements.
    >
    > BTW: IIRC, @Jingsong Lee <[hidden email]> telled me that the
    > actual topology of merged supported HiveSink is more complicated than that.
    >
    >
    > >> I've just briefly skimmed over the proposed interfaces. I would suggest
    > one
    > >> addition to the Writer interface (as I understand this is the runtime
    > >> interface in this proposal?): add some availability method, to avoid, if
    > >> possible, blocking calls on the sink. We already have similar
    > >> availability methods in the new sources [1] and in various places in the
    > >> network stack [2].
    > >> BTW Let's not forget about Piotr's comment. I think we could add the
    > isAvailable or similar method to the Writer interface in the FLIP.
    >
    > Thanks @Dawid Wysakowicz <[hidden email]>  for your reminder.
    > There
    > are two many issues at the same time.
    >
    > In addition to what Ajjoscha said : there is very little system support
    > it.   Another thing I worry about is that: Does the sink's snapshot return
    > immediately when the sink's status is unavailable? Maybe we could do it by
    > dedupe some element in the state but I think it might be too complicated.
    > For me I want to know is what specific sink will benefit from this
    > feature.  @piotr <[hidden email]>  Please correct me if  I
    > misunderstand you. thanks.
    >
    > Best,
    > Guowei
    >
    >
    > On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <[hidden email]>
    > wrote:
    >
    > > What I understand is that HiveSink's implementation might need the local
    > > committer(FileCommitter) because the file rename is needed.
    > > But the iceberg only needs to write the manifest file.  Would you like to
    > > enlighten me why the Iceberg needs the local committer?
    > > Thanks
    > >
    > > Sorry if I caused a confusion here. I am not saying the Iceberg sink
    > needs
    > > a local committer. What I had in mind is that prior to the Iceberg
    > example
    > > I did not see a need for a "GlobalCommitter" in the streaming case. I
    > > thought it is always enough to have the "normal" committer in that case.
    > > Now I understand that this differentiation is not really about logical
    > > separation. It is not really about the granularity with which we commit,
    > > i.e. answering the "WHAT" question. It is really about the performance
    > and
    > > that in the end we will have a single "transaction", so it is about
    > > answering the question "HOW".
    > >
    > >
    > >    -
    > >
    > >    Commit a directory with merged files(Some user want to merge the files
    > >    in a directory before committing the directory to Hive meta store)
    > >
    > >
    > >    1.
    > >
    > >    FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
    > >
    > > I still find the merging case the most confusing. I don't necessarily
    > > understand why do you need the "SingleFileCommit" step in this scenario.
    > > The way I understand "commit" operation is that it makes some
    > > data/artifacts visible to the external system, thus it should be
    > immutable
    > > from a point of view of a single process. Having an additional step in
    > the
    > > same process that works on committed data contradicts with those
    > > assumptions. I might be missing something though. Could you elaborate why
    > > can't it be something like FileWriter -> FileMergeWriter -> Committer
    > > (either global or non-global)? Again it might be just me not getting the
    > > example.
    > >
    > > I've just briefly skimmed over the proposed interfaces. I would suggest
    > one
    > > addition to the Writer interface (as I understand this is the runtime
    > > interface in this proposal?): add some availability method, to avoid, if
    > > possible, blocking calls on the sink. We already have similar
    > > availability methods in the new sources [1] and in various places in the
    > > network stack [2].
    > >
    > > BTW Let's not forget about Piotr's comment. I think we could add the
    > > isAvailable or similar method to the Writer interface in the FLIP.
    > >
    > > Best,
    > >
    > > Dawid
    > > On 15/09/2020 08:06, Guowei Ma wrote:
    > >
    > > I would think that we only need flush() and the semantics are that it
    > > prepares for a commit, so on a physical level it would be called from
    > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > > think flush() should be renamed to something like "prepareCommit()".
    > >
    > > Generally speaking it is a good point that emitting the committables
    > > should happen before emitting the checkpoint barrier downstream.
    > > However, if I remember offline discussions well, the idea behind
    > > Writer#flush and Writer#snapshotState was to differentiate commit on
    > > checkpoint vs final checkpoint at the end of the job. Both of these
    > > methods could emit committables, but the flush should not leave any in
    > > progress state (e.g. in case of file sink in STREAM mode, in
    > > snapshotState it could leave some open files that would be committed in
    > > a subsequent cycle, however flush should close all files). The
    > > snapshotState as it is now can not be called in
    > > prepareSnapshotPreBarrier as it can store some state, which should
    > > happen in Operator#snapshotState as otherwise it would always be
    > > synchronous. Therefore I think we would need sth like:
    > >
    > > void prepareCommit(boolean flush, WriterOutput<CommT> output);
    > >
    > > ver 1:
    > >
    > > List<StateT> snapshotState();
    > >
    > > ver 2:
    > >
    > > void snapshotState(); // not sure if we need that method at all in option
    > >
    > > 2
    > >
    > > I second Dawid's proposal. This is a valid scenario. And version2 does
    > not
    > > need the snapshotState() any more.
    > >
    > >
    > > The Committer is as described in the FLIP, it's basically a function
    > > "void commit(Committable)". The GobalCommitter would be a function "void
    > > commit(List<Committable>)". The former would be used by an S3 sink where
    > > we can individually commit files to S3, a committable would be the list
    > > of part uploads that will form the final file and the commit operation
    > > creates the metadata in S3. The latter would be used by something like
    > > Iceberg where the Committer needs a global view of all the commits to be
    > > efficient and not overwhelm the system.
    > >
    > > I don't know yet if sinks would only implement on type of commit
    > > function or potentially both at the same time, and maybe Commit can
    > > return some CommitResult that gets shipped to the GlobalCommit function.
    > > I must admit it I did not get the need for Local/Normal + Global
    > > committer at first. The Iceberg example helped a lot. I think it makes a
    > > lot of sense.
    > >
    > > @Dawid
    > > What I understand is that HiveSink's implementation might need the local
    > > committer(FileCommitter) because the file rename is needed.
    > > But the iceberg only needs to write the manifest file.  Would you like to
    > > enlighten me why the Iceberg needs the local committer?
    > > Thanks
    > >
    > > Best,
    > > Guowei
    > >
    > >
    > > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
    > [hidden email]> <[hidden email]>
    > > wrote:
    > >
    > >
    > > Hi all,
    > >
    > >
    > > I would think that we only need flush() and the semantics are that it
    > > prepares for a commit, so on a physical level it would be called from
    > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > > think flush() should be renamed to something like "prepareCommit()".
    > >
    > > Generally speaking it is a good point that emitting the committables
    > > should happen before emitting the checkpoint barrier downstream.
    > > However, if I remember offline discussions well, the idea behind
    > > Writer#flush and Writer#snapshotState was to differentiate commit on
    > > checkpoint vs final checkpoint at the end of the job. Both of these
    > > methods could emit committables, but the flush should not leave any in
    > > progress state (e.g. in case of file sink in STREAM mode, in
    > > snapshotState it could leave some open files that would be committed in
    > > a subsequent cycle, however flush should close all files). The
    > > snapshotState as it is now can not be called in
    > > prepareSnapshotPreBarrier as it can store some state, which should
    > > happen in Operator#snapshotState as otherwise it would always be
    > > synchronous. Therefore I think we would need sth like:
    > >
    > > void prepareCommit(boolean flush, WriterOutput<CommT> output);
    > >
    > > ver 1:
    > >
    > > List<StateT> snapshotState();
    > >
    > > ver 2:
    > >
    > > void snapshotState(); // not sure if we need that method at all in
    > option 2
    > >
    > >
    > > The Committer is as described in the FLIP, it's basically a function
    > > "void commit(Committable)". The GobalCommitter would be a function "void
    > > commit(List<Committable>)". The former would be used by an S3 sink where
    > > we can individually commit files to S3, a committable would be the list
    > > of part uploads that will form the final file and the commit operation
    > > creates the metadata in S3. The latter would be used by something like
    > > Iceberg where the Committer needs a global view of all the commits to be
    > > efficient and not overwhelm the system.
    > >
    > > I don't know yet if sinks would only implement on type of commit
    > > function or potentially both at the same time, and maybe Commit can
    > > return some CommitResult that gets shipped to the GlobalCommit function.
    > >
    > > I must admit it I did not get the need for Local/Normal + Global
    > > committer at first. The Iceberg example helped a lot. I think it makes a
    > > lot of sense.
    > >
    > >
    > > For Iceberg, writers don't need any state. But the GlobalCommitter
    > > needs to
    > > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
    > > committer can collect thousands (or more) data files in one checkpoint
    > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    > > collected thousands data files) as StateT. This allows us to absorb
    > > extended commit outages without losing written/uploaded data files, as
    > > operator state size is as small as one manifest file per checkpoint cycle
    > > [2].
    > > ------------------
    > > StateT snapshotState(SnapshotContext context) throws Exception;
    > >
    > > That means we also need the restoreCommitter API in the Sink interface
    > > ---------------
    > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
    > > state);
    > >
    > > I think this might be a valid case. Not sure though if I would go with a
    > > "state" there. Having a state in a committer would imply we need a
    > > collect method as well. So far we needed a single method commit(...) and
    > > the bookkeeping of the committables could be handled by the framework. I
    > > think something like an optional combiner in the GlobalCommitter would
    > > be enough. What do you think?
    > >
    > > GlobalCommitter<CommT, GlobalCommT> {
    > >
    > >     void commit(GlobalCommT globalCommittables);
    > >
    > >     GlobalCommT combine(List<CommT> committables);
    > >
    > > }
    > >
    > > A different problem that I see here is how do we handle commit failures.
    > > Should the committables (both normal and global be included in the next
    > > cycle, shall we retry it, ...) I think it would be worth laying it out
    > > in the FLIP.
    > >
    > > @Aljoscha I think you can find the code Steven was referring in here:
    > >
    > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
    > >
    > > Best,
    > >
    > > Dawid
    > >
    > > On 14/09/2020 15:19, Aljoscha Krettek wrote:
    > >
    > > On 14.09.20 01:23, Steven Wu wrote:
    > >
    > > ## Writer interface
    > >
    > > For the Writer interface, should we add "*prepareSnapshot"* before the
    > > checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
    > > would the framework call "*flush*" before the barrier emitted
    > > downstream?
    > > that guarantee would achieve the same goal.
    > >
    > > I would think that we only need flush() and the semantics are that it
    > > prepares for a commit, so on a physical level it would be called from
    > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
    > > think flush() should be renamed to something like "prepareCommit()".
    > >
    > > @Guowei, what do you think about this?
    > >
    > >
    > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
    > > tuple to the committer. The committer needs checkpointId to separate out
    > > data files for different checkpoints if concurrent checkpoints are
    > > enabled.
    > >
    > > When can this happen? Even with concurrent checkpoints the snapshot
    > > barriers would still cleanly segregate the input stream of an operator
    > > into tranches that should manifest in only one checkpoint. With
    > > concurrent checkpoints, all that can happen is that we start a
    > > checkpoint before a last one is confirmed completed.
    > >
    > > Unless there is some weirdness in the sources and some sources start
    > > chk1 first and some other ones start chk2 first?
    > >
    > > @Piotrek, do you think this is a problem?
    > >
    > >
    > > For the Committer interface, I am wondering if we should split the
    > > single
    > > commit method into separate "*collect"* and "*commit"* methods? This
    > > way,
    > > it can handle both single and multiple CommT objects.
    > >
    > > I think we can't do this. If the sink only needs a regular Commiter,
    > > we can perform the commits in parallel, possibly on different
    > > machines. Only when the sink needs a GlobalCommitter would we need to
    > > ship all commits to a single process and perform the commit there. If
    > > both methods were unified in one interface we couldn't make the
    > > decision of were to commit in the framework code.
    > >
    > >
    > > For Iceberg, writers don't need any state. But the GlobalCommitter
    > > needs to
    > > checkpoint StateT. For the committer, CommT is "DataFile". Since a
    > > single
    > > committer can collect thousands (or more) data files in one checkpoint
    > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
    > > collected thousands data files) as StateT. This allows us to absorb
    > > extended commit outages without losing written/uploaded data files, as
    > > operator state size is as small as one manifest file per checkpoint
    > > cycle
    > >
    > > You could have a point here. Is the code for this available in
    > > open-source? I was checking out
    > >
    > >
    > >
    > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
    > >
    > > and didn't find the ManifestFile optimization there.
    > >
    > > Best,
    > > Aljoscha
    > >
    > >
    > >
    >
    1234