Hi, devs & users
As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API to write jobs that support both bounded and unbounded execution modes. However, Flink does not provide a sink API to guarantee the Exactly-once semantics in both bounded and unbounded scenarios, which blocks the unification. So we want to introduce a new unified sink API which could let the user develop the sink once and run it everywhere. You could find more details in FLIP-143[2]. The FLIP contains some open questions that I'd really appreciate inputs from the community. Some of the open questions include: 1. We provide two alternative Sink API in the FLIP. The only difference between the two versions is how to expose the state to the user. We want to know which one is your preference? 2. How does the sink API support to write to the Hive? 3. Is the sink an operator or a topology? [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API Best, Guowei |
Guowei,
Thanks a lot for the proposal and starting the discussion thread. Very excited. For the big question of "Is the sink an operator or a topology?", I have a few related sub questions. * Where should we run the committers? * Is the committer parallel or single parallelism? * Can a single choice satisfy all sinks? Trying to envision how some sinks can be implemented with this new unified sink interface. 1. Kafka sink Kafka supports non-transactional and transactional writes * Non-transaction writes don't need commit action. we can have *parallel writers and no/no-op committers*. This is probably true for other non-transactional message queues. * Transaction writes can be implemented as *parallel writers and parallel committers*. In this case, I don't know if it makes sense to separate writers and committers into two separate operators, because they probably need to share the same KafkaProducer object. Either way, both writers and committers probably should *run inside task managers*. 2. ES sink ES sink typically buffers the data up to a certain size or time threshold and then uploads/commits a batch to ES. Writers buffer data and flush when needed, and committer does the HTTP bulk upload to commit. To avoid serialization/deserialization cost, we should run *parallel writers and parallel committers* and they *should be* *chained or bundled together* while *running inside task managers*. It can also be implemented as *parallel writers and no/no-op committers*, where all logics (batching and upload) are put inside the writers. 3. Iceberg [1] sink It is currently implemented as two-stage operators with *parallel writers and single-parallelism committers*. * *parallel writers* that write records into data files. Upon checkpoint, writers flush and upload the files, and send the metadata/location of the data files to the downstream committer. Writers need to do the flush inside the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before forwarding the checkpoint barrier to the committer * single-parallelism committer operator. It collects data files from upstream writers. During "snapshotState", it saves collected data files (or an uber metadata file) into state. When the checkpoint is completed, inside "notifyCheckpointComplete" it commits those data files to Iceberg tables. *The committer has to be single parallelism*, because we don't want hundreds or thousands of parallel committers to compete for commit operations with opportunistic concurrency control. It will be very inefficient and probably infeasible if the parallelism is high. Too many tiny commits/transactions can also slow down both the table write and read paths due to too many manifest files. Right now, both Iceberg writer and committer operators run inside task managers. It has one major drawback. With Iceberg sink, embarrassingly parallel jobs won't be embarrassingly parallel anymore. That breaks the benefit of region recovery for embarrassingly parallel DAG. Conceptually, the Writer-Committer sink pattern is like the mirroring of the FLIP-27 Enumerator-Reader source pattern. It will be better *if the committer can run inside the job manager* like the SplitEnumerator for the FLIP-27 source. ----------------------- Additional questions regarding the doc/API * Any example for the writer shared state (Writer#snapshotSharedState)? * We allow the case where the writer has no state, right? Meaning WriterS can be Void. [1] https://iceberg.apache.org/ Thanks, Steven On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote: > Hi, devs & users > > As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor > of DataStream API and Table API. Users should be able to use DataStream API > to write jobs that support both bounded and unbounded execution modes. > However, Flink does not provide a sink API to guarantee the Exactly-once > semantics in both bounded and unbounded scenarios, which blocks the > unification. > > So we want to introduce a new unified sink API which could let the user > develop the sink once and run it everywhere. You could find more details in > FLIP-143[2]. > > The FLIP contains some open questions that I'd really appreciate inputs > from the community. Some of the open questions include: > > 1. We provide two alternative Sink API in the FLIP. The only > difference between the two versions is how to expose the state to the user. > We want to know which one is your preference? > 2. How does the sink API support to write to the Hive? > 3. Is the sink an operator or a topology? > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API > > Best, > Guowei > |
Hi Everyone,
thanks to Guowei for publishing the FLIP, and thanks Steven for the very thoughtful email! We thought a lot internally about some of the questions you posted but left a lot (almost all) of the implementation details out of the FLIP for now because we wanted to focus on semantics and API. I'll try and address the points below. ## Initial Scope of the new Sink API We need to accept some initial scope that we want to achieve for Flink 1.12. I don't think we can try and find the solution that will work for all current and future external systems. For me, the initial goal would be to produce a Sink API and implementations for systems where you can prepare "committables" in one process and commit those from another process. Those are systems that support "real" transactions as you need them in a two-phase commit protocol. This includes: - File Sink, including HDFS, S3 via special part-file uploads - Iceberg - HDFS The work should include runtime support for both BATCH and STREAMING execution as outlined in https://s.apache.org/FLIP-134. Supporting Kafka already becomes difficult but I'll go into that below. ## Where to run the Committer Guowei hinted at this in the FLIP: the idea is that the Framework decides where to run the committer based on requirements and based on the execution mode (STREAMING or BATCH). Something that is not in the FLIP but which we thought about is that we need to allow different types of committers. I'm currently thinking we need at least a normal "Committer" and a "GlobalCommiter" (name TBD). The Committer is as described in the FLIP, it's basically a function "void commit(Committable)". The GobalCommitter would be a function "void commit(List<Committable>)". The former would be used by an S3 sink where we can individually commit files to S3, a committable would be the list of part uploads that will form the final file and the commit operation creates the metadata in S3. The latter would be used by something like Iceberg where the Committer needs a global view of all the commits to be efficient and not overwhelm the system. I don't know yet if sinks would only implement on type of commit function or potentially both at the same time, and maybe Commit can return some CommitResult that gets shipped to the GlobalCommit function. An interesting read on this topic is the discussion on https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop FileOutputCommitter and the two different available algorithms for committing final job/task results. These interfaces untie the sink implementation from the Runtime and we could, for example, have a runtime like this: ### BATCH - Collect all committables and store them in a fault tolerant way until the job finishes - For a normal Commit function, call it on the individual commits. We can potentially distribute this if it becomes a bottleneck - For GlobalCommit function, call it will all the commits. This cannot be distributed We can collect the committables in an OperatorCoordinator or potentially somehow in a task. Though I prefer an OperatorCoordinator right now. The operator coordinator needs to keep the commits in a fault-tolerant way. ### STREAMING - For normal Commit, keep the committables in state on the individual tasks, commit them when a checkpoint completes - For global CommitFunction we have options: collect them in a DOP-1 operator in the topology or send them to an OperatorCoordinator to do the commit there. This is where the source/sink duality that Steven mentions becomes visible. ## Kafka Kafka is a problematic case because it doesn't really support transactions as outlined above. Our current Sink implementations works around that with hacks but that only gets us so far. The problem with Kafka is that we need to aggressively clean up pending transactions in case a failure happens. Otherwise stale transactions would block downstream consumers. See here for details: http://kafka.apache.org/documentation/#isolation.level. The way we solve this in the current Kafka sink is by using a fixed pool of transactional IDs and then cancelling all outstanding transactions for the IDs when we restore from a savepoint. In order for this to work we need to recycle the IDs, so there needs to be a back-channel from the Committer to the Writter, or they need to share internal state. I don't get see a satisfying solution for this so I think we should exclude this from the initial version. ## On Write-Ahead-Log Sinks Some sinks, like ES or Cassandra would require that we keep a WAL in Flink and then ship the contents to the external system on checkpoint. The reason is that these systems don't support real transactions where you can prepare them in one process and commit them from another process. Best, Aljoscha On 11.09.20 02:23, Steven Wu wrote: > Guowei, > > Thanks a lot for the proposal and starting the discussion thread. Very > excited. > > For the big question of "Is the sink an operator or a topology?", I have a > few related sub questions. > * Where should we run the committers? > * Is the committer parallel or single parallelism? > * Can a single choice satisfy all sinks? > > Trying to envision how some sinks can be implemented with this new unified > sink interface. > > 1. Kafka sink > > Kafka supports non-transactional and transactional writes > * Non-transaction writes don't need commit action. we can have *parallel > writers and no/no-op committers*. This is probably true for other > non-transactional message queues. > * Transaction writes can be implemented as *parallel writers and parallel > committers*. In this case, I don't know if it makes sense to separate > writers and committers into two separate operators, because they probably > need to share the same KafkaProducer object. > > Either way, both writers and committers probably should *run inside task > managers*. > > 2. ES sink > > ES sink typically buffers the data up to a certain size or time threshold > and then uploads/commits a batch to ES. Writers buffer data and flush when > needed, and committer does the HTTP bulk upload to commit. To avoid > serialization/deserialization cost, we should run *parallel writers and > parallel committers* and they *should be* *chained or bundled together* > while *running inside task managers*. > > It can also be implemented as *parallel writers and no/no-op committers*, > where all logics (batching and upload) are put inside the writers. > > 3. Iceberg [1] sink > > It is currently implemented as two-stage operators with *parallel writers > and single-parallelism committers*. > * *parallel writers* that write records into data files. Upon checkpoint, > writers flush and upload the files, and send the metadata/location of the > data files to the downstream committer. Writers need to do the flush inside > the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before > forwarding the checkpoint barrier to the committer > * single-parallelism committer operator. It collects data files from > upstream writers. During "snapshotState", it saves collected data files (or > an uber metadata file) into state. When the checkpoint is completed, inside > "notifyCheckpointComplete" it commits those data files to Iceberg tables. *The > committer has to be single parallelism*, because we don't want hundreds or > thousands of parallel committers to compete for commit operations with > opportunistic concurrency control. It will be very inefficient and probably > infeasible if the parallelism is high. Too many tiny commits/transactions > can also slow down both the table write and read paths due to too many > manifest files. > > Right now, both Iceberg writer and committer operators run inside task > managers. It has one major drawback. With Iceberg sink, embarrassingly > parallel jobs won't be embarrassingly parallel anymore. That breaks the > benefit of region recovery for embarrassingly parallel DAG. Conceptually, > the Writer-Committer sink pattern is like the mirroring of the FLIP-27 > Enumerator-Reader source pattern. It will be better *if the committer can > run inside the job manager* like the SplitEnumerator for the FLIP-27 > source. > > ----------------------- > Additional questions regarding the doc/API > * Any example for the writer shared state (Writer#snapshotSharedState)? > * We allow the case where the writer has no state, right? Meaning WriterS > can be Void. > > [1] https://iceberg.apache.org/ > > Thanks, > Steven > > On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote: > >> Hi, devs & users >> >> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor >> of DataStream API and Table API. Users should be able to use DataStream API >> to write jobs that support both bounded and unbounded execution modes. >> However, Flink does not provide a sink API to guarantee the Exactly-once >> semantics in both bounded and unbounded scenarios, which blocks the >> unification. >> >> So we want to introduce a new unified sink API which could let the user >> develop the sink once and run it everywhere. You could find more details in >> FLIP-143[2]. >> >> The FLIP contains some open questions that I'd really appreciate inputs >> from the community. Some of the open questions include: >> >> 1. We provide two alternative Sink API in the FLIP. The only >> difference between the two versions is how to expose the state to the user. >> We want to know which one is your preference? >> 2. How does the sink API support to write to the Hive? >> 3. Is the sink an operator or a topology? >> >> [1] >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 >> [2] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API >> >> Best, >> Guowei >> > |
Regarding the FLIP itself, I like the motivation section and the
What/How/When/Where section a lot! I don't understand why we need the "Drain and Snapshot" section. It seems to be some details about stop-with-savepoint and drain, and the relation to BATCH execution but I don't know if it is needed to understand the rest of the document. I'm happy to be wrong here, though, if there's good reasons for the section. On the question of Alternative 1 and 2, I have a strong preference for Alternative 1 because we could avoid strong coupling to other modules. With Alternative 2 we would depend on `flink-streaming-java` and even `flink-runtime`. For the new source API (FLIP-27) we managed to keep the dependencies slim and the code is in flink-core. I'd be very happy if we can manage the same for the new sink API. Best, Aljoscha On 11.09.20 12:02, Aljoscha Krettek wrote: > Hi Everyone, > > thanks to Guowei for publishing the FLIP, and thanks Steven for the very > thoughtful email! > > We thought a lot internally about some of the questions you posted but > left a lot (almost all) of the implementation details out of the FLIP > for now because we wanted to focus on semantics and API. I'll try and > address the points below. > > ## Initial Scope of the new Sink API > > We need to accept some initial scope that we want to achieve for Flink > 1.12. I don't think we can try and find the solution that will work for > all current and future external systems. For me, the initial goal would > be to produce a Sink API and implementations for systems where you can > prepare "committables" in one process and commit those from another > process. Those are systems that support "real" transactions as you need > them in a two-phase commit protocol. This includes: > > - File Sink, including HDFS, S3 via special part-file uploads > - Iceberg > - HDFS > > The work should include runtime support for both BATCH and STREAMING > execution as outlined in https://s.apache.org/FLIP-134. > > Supporting Kafka already becomes difficult but I'll go into that below. > > ## Where to run the Committer > > Guowei hinted at this in the FLIP: the idea is that the Framework > decides where to run the committer based on requirements and based on > the execution mode (STREAMING or BATCH). > > Something that is not in the FLIP but which we thought about is that we > need to allow different types of committers. I'm currently thinking we > need at least a normal "Committer" and a "GlobalCommiter" (name TBD). > > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. > > An interesting read on this topic is the discussion on > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop > FileOutputCommitter and the two different available algorithms for > committing final job/task results. > > These interfaces untie the sink implementation from the Runtime and we > could, for example, have a runtime like this: > > ### BATCH > > - Collect all committables and store them in a fault tolerant way > until the job finishes > - For a normal Commit function, call it on the individual commits. We > can potentially distribute this if it becomes a bottleneck > - For GlobalCommit function, call it will all the commits. This cannot > be distributed > > We can collect the committables in an OperatorCoordinator or potentially > somehow in a task. Though I prefer an OperatorCoordinator right now. The > operator coordinator needs to keep the commits in a fault-tolerant way. > > ### STREAMING > > - For normal Commit, keep the committables in state on the individual > tasks, commit them when a checkpoint completes > - For global CommitFunction we have options: collect them in a DOP-1 > operator in the topology or send them to an OperatorCoordinator to do > the commit there. This is where the source/sink duality that Steven > mentions becomes visible. > > ## Kafka > > Kafka is a problematic case because it doesn't really support > transactions as outlined above. Our current Sink implementations works > around that with hacks but that only gets us so far. > > The problem with Kafka is that we need to aggressively clean up pending > transactions in case a failure happens. Otherwise stale transactions > would block downstream consumers. See here for details: > http://kafka.apache.org/documentation/#isolation.level. > > The way we solve this in the current Kafka sink is by using a fixed pool > of transactional IDs and then cancelling all outstanding transactions > for the IDs when we restore from a savepoint. In order for this to work > we need to recycle the IDs, so there needs to be a back-channel from the > Committer to the Writter, or they need to share internal state. > > I don't get see a satisfying solution for this so I think we should > exclude this from the initial version. > > ## On Write-Ahead-Log Sinks > > Some sinks, like ES or Cassandra would require that we keep a WAL in > Flink and then ship the contents to the external system on checkpoint. > The reason is that these systems don't support real transactions where > you can prepare them in one process and commit them from another process. > > Best, > Aljoscha > > > On 11.09.20 02:23, Steven Wu wrote: >> Guowei, >> >> Thanks a lot for the proposal and starting the discussion thread. Very >> excited. >> >> For the big question of "Is the sink an operator or a topology?", I >> have a >> few related sub questions. >> * Where should we run the committers? >> * Is the committer parallel or single parallelism? >> * Can a single choice satisfy all sinks? >> >> Trying to envision how some sinks can be implemented with this new >> unified >> sink interface. >> >> 1. Kafka sink >> >> Kafka supports non-transactional and transactional writes >> * Non-transaction writes don't need commit action. we can have *parallel >> writers and no/no-op committers*. This is probably true for other >> non-transactional message queues. >> * Transaction writes can be implemented as *parallel writers and parallel >> committers*. In this case, I don't know if it makes sense to separate >> writers and committers into two separate operators, because they probably >> need to share the same KafkaProducer object. >> >> Either way, both writers and committers probably should *run inside task >> managers*. >> >> 2. ES sink >> >> ES sink typically buffers the data up to a certain size or time threshold >> and then uploads/commits a batch to ES. Writers buffer data and flush >> when >> needed, and committer does the HTTP bulk upload to commit. To avoid >> serialization/deserialization cost, we should run *parallel writers and >> parallel committers* and they *should be* *chained or bundled together* >> while *running inside task managers*. >> >> It can also be implemented as *parallel writers and no/no-op committers*, >> where all logics (batching and upload) are put inside the writers. >> >> 3. Iceberg [1] sink >> >> It is currently implemented as two-stage operators with *parallel writers >> and single-parallelism committers*. >> * *parallel writers* that write records into data files. Upon checkpoint, >> writers flush and upload the files, and send the metadata/location of the >> data files to the downstream committer. Writers need to do the flush >> inside >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) >> before >> forwarding the checkpoint barrier to the committer >> * single-parallelism committer operator. It collects data files from >> upstream writers. During "snapshotState", it saves collected data >> files (or >> an uber metadata file) into state. When the checkpoint is completed, >> inside >> "notifyCheckpointComplete" it commits those data files to Iceberg >> tables. *The >> committer has to be single parallelism*, because we don't want >> hundreds or >> thousands of parallel committers to compete for commit operations with >> opportunistic concurrency control. It will be very inefficient and >> probably >> infeasible if the parallelism is high. Too many tiny commits/transactions >> can also slow down both the table write and read paths due to too many >> manifest files. >> >> Right now, both Iceberg writer and committer operators run inside task >> managers. It has one major drawback. With Iceberg sink, embarrassingly >> parallel jobs won't be embarrassingly parallel anymore. That breaks the >> benefit of region recovery for embarrassingly parallel DAG. Conceptually, >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27 >> Enumerator-Reader source pattern. It will be better *if the committer can >> run inside the job manager* like the SplitEnumerator for the FLIP-27 >> source. >> >> ----------------------- >> Additional questions regarding the doc/API >> * Any example for the writer shared state (Writer#snapshotSharedState)? >> * We allow the case where the writer has no state, right? Meaning WriterS >> can be Void. >> >> [1] https://iceberg.apache.org/ >> >> Thanks, >> Steven >> >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote: >> >>> Hi, devs & users >>> >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in >>> favor >>> of DataStream API and Table API. Users should be able to use >>> DataStream API >>> to write jobs that support both bounded and unbounded execution modes. >>> However, Flink does not provide a sink API to guarantee the Exactly-once >>> semantics in both bounded and unbounded scenarios, which blocks the >>> unification. >>> >>> So we want to introduce a new unified sink API which could let the user >>> develop the sink once and run it everywhere. You could find more >>> details in >>> FLIP-143[2]. >>> >>> The FLIP contains some open questions that I'd really appreciate inputs >>> from the community. Some of the open questions include: >>> >>> 1. We provide two alternative Sink API in the FLIP. The only >>> difference between the two versions is how to expose the state to >>> the user. >>> We want to know which one is your preference? >>> 2. How does the sink API support to write to the Hive? >>> 3. Is the sink an operator or a topology? >>> >>> [1] >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 >>> >>> [2] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API >>> >>> >>> Best, >>> Guowei >>> >> > |
Aljoscha, thanks a lot for the detailed response. Now I have a better
understanding of the initial scope. To me, there are possibly three different committer behaviors. For the lack of better names, let's call them * No/NoopCommitter * Committer / LocalCommitter (file sink?) * GlobalCommitter (Iceberg) ## Writer interface For the Writer interface, should we add "*prepareSnapshot"* before the checkpoint barrier emitted downstream? IcebergWriter would need it. Or would the framework call "*flush*" before the barrier emitted downstream? that guarantee would achieve the same goal. ----------------- // before barrier emitted to downstream void prepareSnapshot(long checkpointId) throws Exception; // or will flush be called automatically before the barrier emitted downstream? // if yes, we need the checkpointId arg for the reason listed in [1] void flush(WriterOutput<CommT> output) throws IOException; In [1], we discussed the reason for Writer to emit (checkpointId, CommT) tuple to the committer. The committer needs checkpointId to separate out data files for different checkpoints if concurrent checkpoints are enabled. For that reason, writers need to know the checkpointId where the restore happened. Can we add a RestoreContext interface to the restoreWriter method? --------------- Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context, RestoreContext restoreContext, List<WriterS> state, List<SharedS> share); interface RestoreContext { long getCheckpointId(); } ## Committer interface For the Committer interface, I am wondering if we should split the single commit method into separate "*collect"* and "*commit"* methods? This way, it can handle both single and multiple CommT objects. ------------------ void commit(CommT committable) throws Exception; ==> void collect(CommT committable) throws Exception; void commit() throws Exception; As discussed in [1] and mentioned above, the Iceberg committer needs to know which checkpointId is the commit for. So can we add checkpiontId arg to the commit API. However, I don't know how this would affect the batch execution where checkpoints are disabled. ------------------ void commit(long checkpointId) throws Exception; For Iceberg, writers don't need any state. But the GlobalCommitter needs to checkpoint StateT. For the committer, CommT is "DataFile". Since a single committer can collect thousands (or more) data files in one checkpoint cycle, as an optimization we checkpoint a single "ManifestFile" (for the collected thousands data files) as StateT. This allows us to absorb extended commit outages without losing written/uploaded data files, as operator state size is as small as one manifest file per checkpoint cycle [2]. ------------------ StateT snapshotState(SnapshotContext context) throws Exception; That means we also need the restoreCommitter API in the Sink interface --------------- Committer<CommT, StateT> restoreCommitter(InitContext context, StateT state); Thanks, Steven [1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663 [2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104 On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek <[hidden email]> wrote: > Regarding the FLIP itself, I like the motivation section and the > What/How/When/Where section a lot! > > I don't understand why we need the "Drain and Snapshot" section. It > seems to be some details about stop-with-savepoint and drain, and the > relation to BATCH execution but I don't know if it is needed to > understand the rest of the document. I'm happy to be wrong here, though, > if there's good reasons for the section. > > On the question of Alternative 1 and 2, I have a strong preference for > Alternative 1 because we could avoid strong coupling to other modules. > With Alternative 2 we would depend on `flink-streaming-java` and even > `flink-runtime`. For the new source API (FLIP-27) we managed to keep the > dependencies slim and the code is in flink-core. I'd be very happy if we > can manage the same for the new sink API. > > Best, > Aljoscha > > On 11.09.20 12:02, Aljoscha Krettek wrote: > > Hi Everyone, > > > > thanks to Guowei for publishing the FLIP, and thanks Steven for the very > > thoughtful email! > > > > We thought a lot internally about some of the questions you posted but > > left a lot (almost all) of the implementation details out of the FLIP > > for now because we wanted to focus on semantics and API. I'll try and > > address the points below. > > > > ## Initial Scope of the new Sink API > > > > We need to accept some initial scope that we want to achieve for Flink > > 1.12. I don't think we can try and find the solution that will work for > > all current and future external systems. For me, the initial goal would > > be to produce a Sink API and implementations for systems where you can > > prepare "committables" in one process and commit those from another > > process. Those are systems that support "real" transactions as you need > > them in a two-phase commit protocol. This includes: > > > > - File Sink, including HDFS, S3 via special part-file uploads > > - Iceberg > > - HDFS > > > > The work should include runtime support for both BATCH and STREAMING > > execution as outlined in https://s.apache.org/FLIP-134. > > > > Supporting Kafka already becomes difficult but I'll go into that below. > > > > ## Where to run the Committer > > > > Guowei hinted at this in the FLIP: the idea is that the Framework > > decides where to run the committer based on requirements and based on > > the execution mode (STREAMING or BATCH). > > > > Something that is not in the FLIP but which we thought about is that we > > need to allow different types of committers. I'm currently thinking we > > need at least a normal "Committer" and a "GlobalCommiter" (name TBD). > > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > > > > An interesting read on this topic is the discussion on > > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop > > FileOutputCommitter and the two different available algorithms for > > committing final job/task results. > > > > These interfaces untie the sink implementation from the Runtime and we > > could, for example, have a runtime like this: > > > > ### BATCH > > > > - Collect all committables and store them in a fault tolerant way > > until the job finishes > > - For a normal Commit function, call it on the individual commits. We > > can potentially distribute this if it becomes a bottleneck > > - For GlobalCommit function, call it will all the commits. This cannot > > be distributed > > > > We can collect the committables in an OperatorCoordinator or potentially > > somehow in a task. Though I prefer an OperatorCoordinator right now. The > > operator coordinator needs to keep the commits in a fault-tolerant way. > > > > ### STREAMING > > > > - For normal Commit, keep the committables in state on the individual > > tasks, commit them when a checkpoint completes > > - For global CommitFunction we have options: collect them in a DOP-1 > > operator in the topology or send them to an OperatorCoordinator to do > > the commit there. This is where the source/sink duality that Steven > > mentions becomes visible. > > > > ## Kafka > > > > Kafka is a problematic case because it doesn't really support > > transactions as outlined above. Our current Sink implementations works > > around that with hacks but that only gets us so far. > > > > The problem with Kafka is that we need to aggressively clean up pending > > transactions in case a failure happens. Otherwise stale transactions > > would block downstream consumers. See here for details: > > http://kafka.apache.org/documentation/#isolation.level. > > > > The way we solve this in the current Kafka sink is by using a fixed pool > > of transactional IDs and then cancelling all outstanding transactions > > for the IDs when we restore from a savepoint. In order for this to work > > we need to recycle the IDs, so there needs to be a back-channel from the > > Committer to the Writter, or they need to share internal state. > > > > I don't get see a satisfying solution for this so I think we should > > exclude this from the initial version. > > > > ## On Write-Ahead-Log Sinks > > > > Some sinks, like ES or Cassandra would require that we keep a WAL in > > Flink and then ship the contents to the external system on checkpoint. > > The reason is that these systems don't support real transactions where > > you can prepare them in one process and commit them from another process. > > > > Best, > > Aljoscha > > > > > > On 11.09.20 02:23, Steven Wu wrote: > >> Guowei, > >> > >> Thanks a lot for the proposal and starting the discussion thread. Very > >> excited. > >> > >> For the big question of "Is the sink an operator or a topology?", I > >> have a > >> few related sub questions. > >> * Where should we run the committers? > >> * Is the committer parallel or single parallelism? > >> * Can a single choice satisfy all sinks? > >> > >> Trying to envision how some sinks can be implemented with this new > >> unified > >> sink interface. > >> > >> 1. Kafka sink > >> > >> Kafka supports non-transactional and transactional writes > >> * Non-transaction writes don't need commit action. we can have *parallel > >> writers and no/no-op committers*. This is probably true for other > >> non-transactional message queues. > >> * Transaction writes can be implemented as *parallel writers and > parallel > >> committers*. In this case, I don't know if it makes sense to separate > >> writers and committers into two separate operators, because they > probably > >> need to share the same KafkaProducer object. > >> > >> Either way, both writers and committers probably should *run inside task > >> managers*. > >> > >> 2. ES sink > >> > >> ES sink typically buffers the data up to a certain size or time > threshold > >> and then uploads/commits a batch to ES. Writers buffer data and flush > >> when > >> needed, and committer does the HTTP bulk upload to commit. To avoid > >> serialization/deserialization cost, we should run *parallel writers and > >> parallel committers* and they *should be* *chained or bundled together* > >> while *running inside task managers*. > >> > >> It can also be implemented as *parallel writers and no/no-op > committers*, > >> where all logics (batching and upload) are put inside the writers. > >> > >> 3. Iceberg [1] sink > >> > >> It is currently implemented as two-stage operators with *parallel > writers > >> and single-parallelism committers*. > >> * *parallel writers* that write records into data files. Upon > checkpoint, > >> writers flush and upload the files, and send the metadata/location of > the > >> data files to the downstream committer. Writers need to do the flush > >> inside > >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) > >> before > >> forwarding the checkpoint barrier to the committer > >> * single-parallelism committer operator. It collects data files from > >> upstream writers. During "snapshotState", it saves collected data > >> files (or > >> an uber metadata file) into state. When the checkpoint is completed, > >> inside > >> "notifyCheckpointComplete" it commits those data files to Iceberg > >> tables. *The > >> committer has to be single parallelism*, because we don't want > >> hundreds or > >> thousands of parallel committers to compete for commit operations with > >> opportunistic concurrency control. It will be very inefficient and > >> probably > >> infeasible if the parallelism is high. Too many tiny > commits/transactions > >> can also slow down both the table write and read paths due to too many > >> manifest files. > >> > >> Right now, both Iceberg writer and committer operators run inside task > >> managers. It has one major drawback. With Iceberg sink, embarrassingly > >> parallel jobs won't be embarrassingly parallel anymore. That breaks the > >> benefit of region recovery for embarrassingly parallel DAG. > Conceptually, > >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27 > >> Enumerator-Reader source pattern. It will be better *if the committer > can > >> run inside the job manager* like the SplitEnumerator for the FLIP-27 > >> source. > >> > >> ----------------------- > >> Additional questions regarding the doc/API > >> * Any example for the writer shared state (Writer#snapshotSharedState)? > >> * We allow the case where the writer has no state, right? Meaning > WriterS > >> can be Void. > >> > >> [1] https://iceberg.apache.org/ > >> > >> Thanks, > >> Steven > >> > >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote: > >> > >>> Hi, devs & users > >>> > >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in > >>> favor > >>> of DataStream API and Table API. Users should be able to use > >>> DataStream API > >>> to write jobs that support both bounded and unbounded execution modes. > >>> However, Flink does not provide a sink API to guarantee the > Exactly-once > >>> semantics in both bounded and unbounded scenarios, which blocks the > >>> unification. > >>> > >>> So we want to introduce a new unified sink API which could let the user > >>> develop the sink once and run it everywhere. You could find more > >>> details in > >>> FLIP-143[2]. > >>> > >>> The FLIP contains some open questions that I'd really appreciate inputs > >>> from the community. Some of the open questions include: > >>> > >>> 1. We provide two alternative Sink API in the FLIP. The only > >>> difference between the two versions is how to expose the state to > >>> the user. > >>> We want to know which one is your preference? > >>> 2. How does the sink API support to write to the Hive? > >>> 3. Is the sink an operator or a topology? > >>> > >>> [1] > >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 > >>> > >>> [2] > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API > >>> > >>> > >>> Best, > >>> Guowei > >>> > >> > > > > |
Hi,
I've just briefly skimmed over the proposed interfaces. I would suggest one addition to the Writer interface (as I understand this is the runtime interface in this proposal?): add some availability method, to avoid, if possible, blocking calls on the sink. We already have similar availability methods in the new sources [1] and in various places in the network stack [2]. I'm aware that many implementations wouldn't be able to implement it, but some may. For example `FlinkKafkaProducer` could use `FlinkKafkaProducer#pendingRecords` to control `Writer`'s availability. Also any sink that would be implementing records handover to some writer thread could easily provide availability as well. Non blocking calls are important for many things, for example they are crucial for unaligned checkpoints to complete quickly. Piotrek [1] org.apache.flink.api.connector.source.SourceReader#isAvailable [2] org.apache.flink.runtime.io.AvailabilityProvider pon., 14 wrz 2020 o 01:23 Steven Wu <[hidden email]> napisał(a): > Aljoscha, thanks a lot for the detailed response. Now I have a better > understanding of the initial scope. > > To me, there are possibly three different committer behaviors. For the lack > of better names, let's call them > * No/NoopCommitter > * Committer / LocalCommitter (file sink?) > * GlobalCommitter (Iceberg) > > ## Writer interface > > For the Writer interface, should we add "*prepareSnapshot"* before the > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > would the framework call "*flush*" before the barrier emitted downstream? > that guarantee would achieve the same goal. > ----------------- > // before barrier emitted to downstream > void prepareSnapshot(long checkpointId) throws Exception; > > // or will flush be called automatically before the barrier emitted > downstream? > // if yes, we need the checkpointId arg for the reason listed in [1] > void flush(WriterOutput<CommT> output) throws IOException; > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > tuple to the committer. The committer needs checkpointId to separate out > data files for different checkpoints if concurrent checkpoints are enabled. > For that reason, writers need to know the checkpointId where the restore > happened. Can we add a RestoreContext interface to the restoreWriter > method? > --------------- > Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context, > RestoreContext restoreContext, List<WriterS> state, List<SharedS> share); > > interface RestoreContext { > long getCheckpointId(); > } > > > ## Committer interface > > For the Committer interface, I am wondering if we should split the single > commit method into separate "*collect"* and "*commit"* methods? This way, > it can handle both single and multiple CommT objects. > ------------------ > void commit(CommT committable) throws Exception; > ==> > void collect(CommT committable) throws Exception; > void commit() throws Exception; > > As discussed in [1] and mentioned above, the Iceberg committer needs to > know which checkpointId is the commit for. So can we add checkpiontId arg > to the commit API. However, I don't know how this would affect the batch > execution where checkpoints are disabled. > ------------------ > void commit(long checkpointId) throws Exception; > > For Iceberg, writers don't need any state. But the GlobalCommitter needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint cycle > [2]. > ------------------ > StateT snapshotState(SnapshotContext context) throws Exception; > > That means we also need the restoreCommitter API in the Sink interface > --------------- > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > state); > > > Thanks, > Steven > > [1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663 > [2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104 > > > > On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek <[hidden email]> > wrote: > > > Regarding the FLIP itself, I like the motivation section and the > > What/How/When/Where section a lot! > > > > I don't understand why we need the "Drain and Snapshot" section. It > > seems to be some details about stop-with-savepoint and drain, and the > > relation to BATCH execution but I don't know if it is needed to > > understand the rest of the document. I'm happy to be wrong here, though, > > if there's good reasons for the section. > > > > On the question of Alternative 1 and 2, I have a strong preference for > > Alternative 1 because we could avoid strong coupling to other modules. > > With Alternative 2 we would depend on `flink-streaming-java` and even > > `flink-runtime`. For the new source API (FLIP-27) we managed to keep the > > dependencies slim and the code is in flink-core. I'd be very happy if we > > can manage the same for the new sink API. > > > > Best, > > Aljoscha > > > > On 11.09.20 12:02, Aljoscha Krettek wrote: > > > Hi Everyone, > > > > > > thanks to Guowei for publishing the FLIP, and thanks Steven for the > very > > > thoughtful email! > > > > > > We thought a lot internally about some of the questions you posted but > > > left a lot (almost all) of the implementation details out of the FLIP > > > for now because we wanted to focus on semantics and API. I'll try and > > > address the points below. > > > > > > ## Initial Scope of the new Sink API > > > > > > We need to accept some initial scope that we want to achieve for Flink > > > 1.12. I don't think we can try and find the solution that will work for > > > all current and future external systems. For me, the initial goal would > > > be to produce a Sink API and implementations for systems where you can > > > prepare "committables" in one process and commit those from another > > > process. Those are systems that support "real" transactions as you need > > > them in a two-phase commit protocol. This includes: > > > > > > - File Sink, including HDFS, S3 via special part-file uploads > > > - Iceberg > > > - HDFS > > > > > > The work should include runtime support for both BATCH and STREAMING > > > execution as outlined in https://s.apache.org/FLIP-134. > > > > > > Supporting Kafka already becomes difficult but I'll go into that below. > > > > > > ## Where to run the Committer > > > > > > Guowei hinted at this in the FLIP: the idea is that the Framework > > > decides where to run the committer based on requirements and based on > > > the execution mode (STREAMING or BATCH). > > > > > > Something that is not in the FLIP but which we thought about is that we > > > need to allow different types of committers. I'm currently thinking we > > > need at least a normal "Committer" and a "GlobalCommiter" (name TBD). > > > > > > The Committer is as described in the FLIP, it's basically a function > > > "void commit(Committable)". The GobalCommitter would be a function > "void > > > commit(List<Committable>)". The former would be used by an S3 sink > where > > > we can individually commit files to S3, a committable would be the list > > > of part uploads that will form the final file and the commit operation > > > creates the metadata in S3. The latter would be used by something like > > > Iceberg where the Committer needs a global view of all the commits to > be > > > efficient and not overwhelm the system. > > > > > > I don't know yet if sinks would only implement on type of commit > > > function or potentially both at the same time, and maybe Commit can > > > return some CommitResult that gets shipped to the GlobalCommit > function. > > > > > > An interesting read on this topic is the discussion on > > > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop > > > FileOutputCommitter and the two different available algorithms for > > > committing final job/task results. > > > > > > These interfaces untie the sink implementation from the Runtime and we > > > could, for example, have a runtime like this: > > > > > > ### BATCH > > > > > > - Collect all committables and store them in a fault tolerant way > > > until the job finishes > > > - For a normal Commit function, call it on the individual commits. We > > > can potentially distribute this if it becomes a bottleneck > > > - For GlobalCommit function, call it will all the commits. This > cannot > > > be distributed > > > > > > We can collect the committables in an OperatorCoordinator or > potentially > > > somehow in a task. Though I prefer an OperatorCoordinator right now. > The > > > operator coordinator needs to keep the commits in a fault-tolerant way. > > > > > > ### STREAMING > > > > > > - For normal Commit, keep the committables in state on the individual > > > tasks, commit them when a checkpoint completes > > > - For global CommitFunction we have options: collect them in a DOP-1 > > > operator in the topology or send them to an OperatorCoordinator to do > > > the commit there. This is where the source/sink duality that Steven > > > mentions becomes visible. > > > > > > ## Kafka > > > > > > Kafka is a problematic case because it doesn't really support > > > transactions as outlined above. Our current Sink implementations works > > > around that with hacks but that only gets us so far. > > > > > > The problem with Kafka is that we need to aggressively clean up pending > > > transactions in case a failure happens. Otherwise stale transactions > > > would block downstream consumers. See here for details: > > > http://kafka.apache.org/documentation/#isolation.level. > > > > > > The way we solve this in the current Kafka sink is by using a fixed > pool > > > of transactional IDs and then cancelling all outstanding transactions > > > for the IDs when we restore from a savepoint. In order for this to work > > > we need to recycle the IDs, so there needs to be a back-channel from > the > > > Committer to the Writter, or they need to share internal state. > > > > > > I don't get see a satisfying solution for this so I think we should > > > exclude this from the initial version. > > > > > > ## On Write-Ahead-Log Sinks > > > > > > Some sinks, like ES or Cassandra would require that we keep a WAL in > > > Flink and then ship the contents to the external system on checkpoint. > > > The reason is that these systems don't support real transactions where > > > you can prepare them in one process and commit them from another > process. > > > > > > Best, > > > Aljoscha > > > > > > > > > On 11.09.20 02:23, Steven Wu wrote: > > >> Guowei, > > >> > > >> Thanks a lot for the proposal and starting the discussion thread. Very > > >> excited. > > >> > > >> For the big question of "Is the sink an operator or a topology?", I > > >> have a > > >> few related sub questions. > > >> * Where should we run the committers? > > >> * Is the committer parallel or single parallelism? > > >> * Can a single choice satisfy all sinks? > > >> > > >> Trying to envision how some sinks can be implemented with this new > > >> unified > > >> sink interface. > > >> > > >> 1. Kafka sink > > >> > > >> Kafka supports non-transactional and transactional writes > > >> * Non-transaction writes don't need commit action. we can have > *parallel > > >> writers and no/no-op committers*. This is probably true for other > > >> non-transactional message queues. > > >> * Transaction writes can be implemented as *parallel writers and > > parallel > > >> committers*. In this case, I don't know if it makes sense to separate > > >> writers and committers into two separate operators, because they > > probably > > >> need to share the same KafkaProducer object. > > >> > > >> Either way, both writers and committers probably should *run inside > task > > >> managers*. > > >> > > >> 2. ES sink > > >> > > >> ES sink typically buffers the data up to a certain size or time > > threshold > > >> and then uploads/commits a batch to ES. Writers buffer data and flush > > >> when > > >> needed, and committer does the HTTP bulk upload to commit. To avoid > > >> serialization/deserialization cost, we should run *parallel writers > and > > >> parallel committers* and they *should be* *chained or bundled > together* > > >> while *running inside task managers*. > > >> > > >> It can also be implemented as *parallel writers and no/no-op > > committers*, > > >> where all logics (batching and upload) are put inside the writers. > > >> > > >> 3. Iceberg [1] sink > > >> > > >> It is currently implemented as two-stage operators with *parallel > > writers > > >> and single-parallelism committers*. > > >> * *parallel writers* that write records into data files. Upon > > checkpoint, > > >> writers flush and upload the files, and send the metadata/location of > > the > > >> data files to the downstream committer. Writers need to do the flush > > >> inside > > >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) > > >> before > > >> forwarding the checkpoint barrier to the committer > > >> * single-parallelism committer operator. It collects data files from > > >> upstream writers. During "snapshotState", it saves collected data > > >> files (or > > >> an uber metadata file) into state. When the checkpoint is completed, > > >> inside > > >> "notifyCheckpointComplete" it commits those data files to Iceberg > > >> tables. *The > > >> committer has to be single parallelism*, because we don't want > > >> hundreds or > > >> thousands of parallel committers to compete for commit operations with > > >> opportunistic concurrency control. It will be very inefficient and > > >> probably > > >> infeasible if the parallelism is high. Too many tiny > > commits/transactions > > >> can also slow down both the table write and read paths due to too many > > >> manifest files. > > >> > > >> Right now, both Iceberg writer and committer operators run inside task > > >> managers. It has one major drawback. With Iceberg sink, embarrassingly > > >> parallel jobs won't be embarrassingly parallel anymore. That breaks > the > > >> benefit of region recovery for embarrassingly parallel DAG. > > Conceptually, > > >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27 > > >> Enumerator-Reader source pattern. It will be better *if the committer > > can > > >> run inside the job manager* like the SplitEnumerator for the FLIP-27 > > >> source. > > >> > > >> ----------------------- > > >> Additional questions regarding the doc/API > > >> * Any example for the writer shared state > (Writer#snapshotSharedState)? > > >> * We allow the case where the writer has no state, right? Meaning > > WriterS > > >> can be Void. > > >> > > >> [1] https://iceberg.apache.org/ > > >> > > >> Thanks, > > >> Steven > > >> > > >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> > wrote: > > >> > > >>> Hi, devs & users > > >>> > > >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in > > >>> favor > > >>> of DataStream API and Table API. Users should be able to use > > >>> DataStream API > > >>> to write jobs that support both bounded and unbounded execution > modes. > > >>> However, Flink does not provide a sink API to guarantee the > > Exactly-once > > >>> semantics in both bounded and unbounded scenarios, which blocks the > > >>> unification. > > >>> > > >>> So we want to introduce a new unified sink API which could let the > user > > >>> develop the sink once and run it everywhere. You could find more > > >>> details in > > >>> FLIP-143[2]. > > >>> > > >>> The FLIP contains some open questions that I'd really appreciate > inputs > > >>> from the community. Some of the open questions include: > > >>> > > >>> 1. We provide two alternative Sink API in the FLIP. The only > > >>> difference between the two versions is how to expose the state to > > >>> the user. > > >>> We want to know which one is your preference? > > >>> 2. How does the sink API support to write to the Hive? > > >>> 3. Is the sink an operator or a topology? > > >>> > > >>> [1] > > >>> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 > > >>> > > >>> [2] > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API > > >>> > > >>> > > >>> Best, > > >>> Guowei > > >>> > > >> > > > > > > > > |
In reply to this post by Steven Wu
On 14.09.20 01:23, Steven Wu wrote:
> ## Writer interface > > For the Writer interface, should we add "*prepareSnapshot"* before the > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > would the framework call "*flush*" before the barrier emitted downstream? > that guarantee would achieve the same goal. I would think that we only need flush() and the semantics are that it prepares for a commit, so on a physical level it would be called from "prepareSnapshotPreBarrier". Now that I'm thinking about it more I think flush() should be renamed to something like "prepareCommit()". @Guowei, what do you think about this? > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > tuple to the committer. The committer needs checkpointId to separate out > data files for different checkpoints if concurrent checkpoints are enabled. When can this happen? Even with concurrent checkpoints the snapshot barriers would still cleanly segregate the input stream of an operator into tranches that should manifest in only one checkpoint. With concurrent checkpoints, all that can happen is that we start a checkpoint before a last one is confirmed completed. Unless there is some weirdness in the sources and some sources start chk1 first and some other ones start chk2 first? @Piotrek, do you think this is a problem? > For the Committer interface, I am wondering if we should split the single > commit method into separate "*collect"* and "*commit"* methods? This way, > it can handle both single and multiple CommT objects. I think we can't do this. If the sink only needs a regular Commiter, we can perform the commits in parallel, possibly on different machines. Only when the sink needs a GlobalCommitter would we need to ship all commits to a single process and perform the commit there. If both methods were unified in one interface we couldn't make the decision of were to commit in the framework code. > For Iceberg, writers don't need any state. But the GlobalCommitter needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint cycle You could have a point here. Is the code for this available in open-source? I was checking out https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java and didn't find the ManifestFile optimization there. Best, Aljoscha |
I thought about this some more. One of the important parts of the
Iceberg sink is to know whether we have already committed some DataFiles. Currently, this is implemented by writing a (JobId, MaxCheckpointId) tuple to the Iceberg table when committing. When restoring from a failure we check this and discard committables (DataFile) that we know to already be committed. I think this can have some problems, for example when checkpoint ids are not strictly sequential, when we wrap around, or when the JobID changes. This will happen when doing a stop/start-from-savepoint cycle, for example. I think we could fix this by having Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique and will not change for repeated invocations of the GlobalCommitter with the same set of committables. The GlobalCommitter could use this to determine whether a set of committables has already been committed to the Iceberg table. It's seems very tailor-made for Iceberg for now but other systems should suffer from the same problem. Best, Aljoscha |
In reply to this post by Aljoscha Krettek-2
Hi all,
> I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". Generally speaking it is a good point that emitting the committables should happen before emitting the checkpoint barrier downstream. However, if I remember offline discussions well, the idea behind Writer#flush and Writer#snapshotState was to differentiate commit on checkpoint vs final checkpoint at the end of the job. Both of these methods could emit committables, but the flush should not leave any in progress state (e.g. in case of file sink in STREAM mode, in snapshotState it could leave some open files that would be committed in a subsequent cycle, however flush should close all files). The snapshotState as it is now can not be called in prepareSnapshotPreBarrier as it can store some state, which should happen in Operator#snapshotState as otherwise it would always be synchronous. Therefore I think we would need sth like: void prepareCommit(boolean flush, WriterOutput<CommT> output); ver 1: List<StateT> snapshotState(); ver 2: void snapshotState(); // not sure if we need that method at all in option 2 > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. lot of sense. > For Iceberg, writers don't need any state. But the GlobalCommitter > needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint cycle > [2]. > ------------------ > StateT snapshotState(SnapshotContext context) throws Exception; > > That means we also need the restoreCommitter API in the Sink interface > --------------- > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > state); collect method as well. So far we needed a single method commit(...) and the bookkeeping of the committables could be handled by the framework. I think something like an optional combiner in the GlobalCommitter would be enough. What do you think? GlobalCommitter<CommT, GlobalCommT> { void commit(GlobalCommT globalCommittables); GlobalCommT combine(List<CommT> committables); } A different problem that I see here is how do we handle commit failures. Should the committables (both normal and global be included in the next cycle, shall we retry it, ...) I think it would be worth laying it out in the FLIP. @Aljoscha I think you can find the code Steven was referring in here: https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java Best, Dawid On 14/09/2020 15:19, Aljoscha Krettek wrote: > On 14.09.20 01:23, Steven Wu wrote: >> ## Writer interface >> >> For the Writer interface, should we add "*prepareSnapshot"* before the >> checkpoint barrier emitted downstream? IcebergWriter would need it. Or >> would the framework call "*flush*" before the barrier emitted >> downstream? >> that guarantee would achieve the same goal. > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > @Guowei, what do you think about this? > >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT) >> tuple to the committer. The committer needs checkpointId to separate out >> data files for different checkpoints if concurrent checkpoints are >> enabled. > > When can this happen? Even with concurrent checkpoints the snapshot > barriers would still cleanly segregate the input stream of an operator > into tranches that should manifest in only one checkpoint. With > concurrent checkpoints, all that can happen is that we start a > checkpoint before a last one is confirmed completed. > > Unless there is some weirdness in the sources and some sources start > chk1 first and some other ones start chk2 first? > > @Piotrek, do you think this is a problem? > >> For the Committer interface, I am wondering if we should split the >> single >> commit method into separate "*collect"* and "*commit"* methods? This >> way, >> it can handle both single and multiple CommT objects. > > I think we can't do this. If the sink only needs a regular Commiter, > we can perform the commits in parallel, possibly on different > machines. Only when the sink needs a GlobalCommitter would we need to > ship all commits to a single process and perform the commit there. If > both methods were unified in one interface we couldn't make the > decision of were to commit in the framework code. > >> For Iceberg, writers don't need any state. But the GlobalCommitter >> needs to >> checkpoint StateT. For the committer, CommT is "DataFile". Since a >> single >> committer can collect thousands (or more) data files in one checkpoint >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the >> collected thousands data files) as StateT. This allows us to absorb >> extended commit outages without losing written/uploaded data files, as >> operator state size is as small as one manifest file per checkpoint >> cycle > > You could have a point here. Is the code for this available in > open-source? I was checking out > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > and didn't find the ManifestFile optimization there. > > Best, > Aljoscha > signature.asc (849 bytes) Download Attachment |
Hi all,
Very thanks for the discussion and the valuable opinions! Currently there are several ongoing issues and we would like to show what we are thinking in the next few mails. It seems that the biggest issue now is about the topology of the sinks. Before deciding what the sink API would look like, I would like to first summarize the different topologies we have mentioned so that we could sync on the same page and gain more insights about this issue. There are four types of topology I could see. Please correct me if I misunderstand what you mean: 1. Commit individual files. (StreamingFileSink) 1. FileWriter -> FileCommitter 2. Commit a directory (HiveSink) 1. FileWriter -> FileCommitter -> GlobalCommitter 3. Commit a bundle of files (Iceberg) 1. DataFileWriter -> GlobalCommitter 4. Commit a directory with merged files(Some user want to merge the files in a directory before committing the directory to Hive meta store) 1. FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter It can be seen from the above that the topologies are different according to different requirements. Not only that there may be other options for the second and third categories. E.g A alternative topology option for the IcebergSink might be : DataFileWriter -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg take care of the cleanup instead of coupling the cleanup logic to the committer. In the long run I think we might provide the sink developer the ability to build arbitrary topologies. Maybe Flink could only provide a basic commit transformation and let the user build other parts of the topology. In the 1.12 we might first provide different patterns for these different scenarios at first and I think these components could be reused in the future. Best, Guowei On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]> wrote: > Hi all, > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option 2 > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint cycle > > [2]. > > ------------------ > > StateT snapshotState(SnapshotContext context) throws Exception; > > > > That means we also need the restoreCommitter API in the Sink interface > > --------------- > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > > state); > I think this might be a valid case. Not sure though if I would go with a > "state" there. Having a state in a committer would imply we need a > collect method as well. So far we needed a single method commit(...) and > the bookkeeping of the committables could be handled by the framework. I > think something like an optional combiner in the GlobalCommitter would > be enough. What do you think? > > GlobalCommitter<CommT, GlobalCommT> { > > void commit(GlobalCommT globalCommittables); > > GlobalCommT combine(List<CommT> committables); > > } > > A different problem that I see here is how do we handle commit failures. > Should the committables (both normal and global be included in the next > cycle, shall we retry it, ...) I think it would be worth laying it out > in the FLIP. > > @Aljoscha I think you can find the code Steven was referring in here: > > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > Best, > > Dawid > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > On 14.09.20 01:23, Steven Wu wrote: > >> ## Writer interface > >> > >> For the Writer interface, should we add "*prepareSnapshot"* before the > >> checkpoint barrier emitted downstream? IcebergWriter would need it. Or > >> would the framework call "*flush*" before the barrier emitted > >> downstream? > >> that guarantee would achieve the same goal. > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > @Guowei, what do you think about this? > > > >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > >> tuple to the committer. The committer needs checkpointId to separate out > >> data files for different checkpoints if concurrent checkpoints are > >> enabled. > > > > When can this happen? Even with concurrent checkpoints the snapshot > > barriers would still cleanly segregate the input stream of an operator > > into tranches that should manifest in only one checkpoint. With > > concurrent checkpoints, all that can happen is that we start a > > checkpoint before a last one is confirmed completed. > > > > Unless there is some weirdness in the sources and some sources start > > chk1 first and some other ones start chk2 first? > > > > @Piotrek, do you think this is a problem? > > > >> For the Committer interface, I am wondering if we should split the > >> single > >> commit method into separate "*collect"* and "*commit"* methods? This > >> way, > >> it can handle both single and multiple CommT objects. > > > > I think we can't do this. If the sink only needs a regular Commiter, > > we can perform the commits in parallel, possibly on different > > machines. Only when the sink needs a GlobalCommitter would we need to > > ship all commits to a single process and perform the commit there. If > > both methods were unified in one interface we couldn't make the > > decision of were to commit in the framework code. > > > >> For Iceberg, writers don't need any state. But the GlobalCommitter > >> needs to > >> checkpoint StateT. For the committer, CommT is "DataFile". Since a > >> single > >> committer can collect thousands (or more) data files in one checkpoint > >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the > >> collected thousands data files) as StateT. This allows us to absorb > >> extended commit outages without losing written/uploaded data files, as > >> operator state size is as small as one manifest file per checkpoint > >> cycle > > > > You could have a point here. Is the code for this available in > > open-source? I was checking out > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > and didn't find the ManifestFile optimization there. > > > > Best, > > Aljoscha > > > > |
## concurrent checkpoints [hidden email] regarding the concurrent checkpoints, let me illustrate with a simple DAG below. Let's assume each writer emits one file per checkpoint cycle and writer-2 is slow. Now let's look at what the global committer receives timeline: ----------------------------------------------------------> Now from Writer-1: file-1-1, ck-1, file-1-2, ck-2 from Writer-2: file-2-1, ck-1 In this case, the committer shouldn't include "file-1-2" into the commit for ck-1. ## Committable bookkeeping and combining I like David's proposal where the framework takes care of the bookkeeping of committables and provides a combiner API (CommT -> GlobalCommT) for GlobalCommitter. The only requirement is to tie the commit/CommT/GlobalCommT to a checkpoint. When a commit is successful for checkpoint-N, the framework needs to remove the GlobalCommT from the state corresponding to checkpoints <= N. If a commit fails, the GlobalCommT accumulates and will be included in the next cycle. That is how the Iceberg sink works. I think it is good to piggyback retries with Flink's periodic checkpoints for Iceberg sink. Otherwise, it can get complicated to implement retry logic that won't interfere with Flink checkpoints. The main question is if this pattern is generic to be put into the sink framework or not. > A alternative topology option for the IcebergSink might be : DataFileWriter -> Agg -> GlobalCommitter. One pro of this method is that we can let Aggtake care of the cleanup instead of coupling the cleanup logic to the committer. [hidden email] I would favor David's suggestion of "combine" API rather than a separate "Agg" operator. ## Using checkpointId > I think this can have some problems, for example when checkpoint ids are not strictly sequential, when we wrap around, or when the JobID changes.This will happen when doing a stop/start-from-savepoint cycle, for example. checkpointId can work if it is monotonically increasing, which I believe is the case for Flink today. Restoring from checkpoint or savepoint will resume the checkpointIds. We can deal with JobID change by saving it into the state and Iceberg snapshot metadata. There is already a PR [1] for that. ## Nonce > Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique That is actually how we implemented internally. Flink Iceberg sink basically hashes the Manifest file location as the nonce. Since the Flink generated Manifest file location is unique, it guarantees the nonce is unique. IMO, checkpointId is also one way of implementing Nonce based on today's Flink behavior. > and will not change for repeated invocations of the GlobalCommitter with the same set of committables if the same set of committables are combined into one GlobalCommT (like ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT interface. BTW, as David pointed out, the ManifestFile optimization is only in our internal implementation [2] right now. For the open source version, there is a github issue [3] to track follow-up improvements. Thanks, Steven On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <[hidden email]> wrote: Hi all, |
## Concurrent checkpoints AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode. ## Committable bookkeeping and combining I agree with you that the "CombineGlobalCommitter" would work. But we put more optimization logic in the committer, which will make the committer more and more complicated, and eventually become the same as the Writer. For example, The committer needs to clean up some unused manifest file when restoring from a failure if we introduce the optimizations to the committer. In this case another alternative might be to put this "merging" optimization to a separate agg operator(maybe just like another `Writer`?). The agg could produce an aggregated committable to the committer. The agg operator could manage the whole life cycle of the manifest file it created. It would make the committer have single responsibility. >>The main question is if this pattern is generic to be put into the sink framework or not. Maybe I am wrong. But what I can feel from the current discussion is that different requirements have different topological requirements. ## Using checkpointId In the batch execution mode there would be no normal checkpoint any more. That is why we do not introduce the checkpoint id in the API. So it is a great thing that sink decouples its implementation from checkpointid. :) Best, Guowei On Tue, Sep 15, 2020 at 7:33 AM Steven Wu <[hidden email]> wrote:
|
Hi, aljoscha >I don't understand why we need the "Drain and Snapshot" section. It >seems to be some details about stop-with-savepoint and drain, and the >relation to BATCH execution but I don't know if it is needed to >understand the rest of the document. I'm happy to be wrong here, though, >if there's good reasons for the section. The new unified sink API should provide a way for the sink developer to deal with EOI(Drain) to guarantee the Exactly-once semantics. This is what I want to say mostly in this section. Current streaming style sink API does not provide a good way to deal with it. It is why the `StreamingFileSink` does not commit the last part of data in the bounded scenario. Our theme is unified. I am afraid that I will let users misunderstand that adding this requirement to the new sink API is only for bounded scenarios, so I explained in this paragraph that stop-with-savepoint might also have the similar requirement. For the snapshot I also want to prevent users from misunderstanding that it is specially prepared for the unbounded scenario. Actually it might be also possible with bounded + batch execution mode in the future. I could reorganize the section if this section makes the reader confused but I think we might need to keep the drain at least. WDYT? >On the question of Alternative 1 and 2, I have a strong preference for >Alternative 1 because we could avoid strong coupling to other modules. >With Alternative 2 we would depend on `flink-streaming-java` and even >`flink-runtime`. For the new source API (FLIP-27) we managed to keep the >dependencies slim and the code is in flink-core. I'd be very happy if we >can manage the same for the new sink API. I am open to alternative 1. Maybe I miss something but I do not get why the second alternative would depend on `flink-runtime` or `flink-streaming-java`. The all the state api currently is in the flink-core. Could you give some further explanation? thanks :) Best, Guowei On Tue, Sep 15, 2020 at 12:05 PM Guowei Ma <[hidden email]> wrote:
|
In reply to this post by dwysakowicz
>> I would think that we only need flush() and the semantics are that it
>> prepares for a commit, so on a physical level it would be called from >> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> think flush() should be renamed to something like "prepareCommit()". > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > void prepareCommit(boolean flush, WriterOutput<CommT> output); > ver 1: > List<StateT> snapshotState(); > ver 2: > void snapshotState(); // not sure if we need that method at all in option 2 I second Dawid's proposal. This is a valid scenario. And version2 does not need the snapshotState() any more. >> The Committer is as described in the FLIP, it's basically a function >> "void commit(Committable)". The GobalCommitter would be a function "void >> commit(List<Committable>)". The former would be used by an S3 sink where >> we can individually commit files to S3, a committable would be the list >> of part uploads that will form the final file and the commit operation >> creates the metadata in S3. The latter would be used by something like >> Iceberg where the Committer needs a global view of all the commits to be >> efficient and not overwhelm the system. >> >> I don't know yet if sinks would only implement on type of commit >> function or potentially both at the same time, and maybe Commit can >> return some CommitResult that gets shipped to the GlobalCommit function. >> I must admit it I did not get the need for Local/Normal + Global >> committer at first. The Iceberg example helped a lot. I think it makes a >> lot of sense. @Dawid What I understand is that HiveSink's implementation might need the local committer(FileCommitter) because the file rename is needed. But the iceberg only needs to write the manifest file. Would you like to enlighten me why the Iceberg needs the local committer? Thanks Best, Guowei On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]> wrote: > Hi all, > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option 2 > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint cycle > > [2]. > > ------------------ > > StateT snapshotState(SnapshotContext context) throws Exception; > > > > That means we also need the restoreCommitter API in the Sink interface > > --------------- > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > > state); > I think this might be a valid case. Not sure though if I would go with a > "state" there. Having a state in a committer would imply we need a > collect method as well. So far we needed a single method commit(...) and > the bookkeeping of the committables could be handled by the framework. I > think something like an optional combiner in the GlobalCommitter would > be enough. What do you think? > > GlobalCommitter<CommT, GlobalCommT> { > > void commit(GlobalCommT globalCommittables); > > GlobalCommT combine(List<CommT> committables); > > } > > A different problem that I see here is how do we handle commit failures. > Should the committables (both normal and global be included in the next > cycle, shall we retry it, ...) I think it would be worth laying it out > in the FLIP. > > @Aljoscha I think you can find the code Steven was referring in here: > > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > Best, > > Dawid > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > On 14.09.20 01:23, Steven Wu wrote: > >> ## Writer interface > >> > >> For the Writer interface, should we add "*prepareSnapshot"* before the > >> checkpoint barrier emitted downstream? IcebergWriter would need it. Or > >> would the framework call "*flush*" before the barrier emitted > >> downstream? > >> that guarantee would achieve the same goal. > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > @Guowei, what do you think about this? > > > >> In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > >> tuple to the committer. The committer needs checkpointId to separate out > >> data files for different checkpoints if concurrent checkpoints are > >> enabled. > > > > When can this happen? Even with concurrent checkpoints the snapshot > > barriers would still cleanly segregate the input stream of an operator > > into tranches that should manifest in only one checkpoint. With > > concurrent checkpoints, all that can happen is that we start a > > checkpoint before a last one is confirmed completed. > > > > Unless there is some weirdness in the sources and some sources start > > chk1 first and some other ones start chk2 first? > > > > @Piotrek, do you think this is a problem? > > > >> For the Committer interface, I am wondering if we should split the > >> single > >> commit method into separate "*collect"* and "*commit"* methods? This > >> way, > >> it can handle both single and multiple CommT objects. > > > > I think we can't do this. If the sink only needs a regular Commiter, > > we can perform the commits in parallel, possibly on different > > machines. Only when the sink needs a GlobalCommitter would we need to > > ship all commits to a single process and perform the commit there. If > > both methods were unified in one interface we couldn't make the > > decision of were to commit in the framework code. > > > >> For Iceberg, writers don't need any state. But the GlobalCommitter > >> needs to > >> checkpoint StateT. For the committer, CommT is "DataFile". Since a > >> single > >> committer can collect thousands (or more) data files in one checkpoint > >> cycle, as an optimization we checkpoint a single "ManifestFile" (for the > >> collected thousands data files) as StateT. This allows us to absorb > >> extended commit outages without losing written/uploaded data files, as > >> operator state size is as small as one manifest file per checkpoint > >> cycle > > > > You could have a point here. Is the code for this available in > > open-source? I was checking out > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > and didn't find the ManifestFile optimization there. > > > > Best, > > Aljoscha > > > > |
In reply to this post by Steven Wu
On 15.09.20 01:33, Steven Wu wrote:
> ## concurrent checkpoints > > @Aljoscha Krettek <[hidden email]> regarding the concurrent > checkpoints, let me illustrate with a simple DAG below. > [image: image.png] Hi Steven, images don't make it through to the mailing lists. You would need to host the file somewhere and send a link. Best, Aljoscha |
In reply to this post by Guowei Ma
On 15.09.20 06:05, Guowei Ma wrote:
> ## Using checkpointId > In the batch execution mode there would be no normal checkpoint any more. > That is why we do not introduce the checkpoint id in the API. So it is a > great thing that sink decouples its implementation from checkpointid. :) Yes, this is a very important point of the design! On 15.09.20 06:28, Guowei Ma wrote: > I am open to alternative 1. Maybe I miss something but I do not get why the > second alternative would depend on `flink-runtime` or > `flink-streaming-java`. The all the state api currently is in the > flink-core. Could you give some further explanation? thanks :) You're right, yes. It seems I was thinking about other things. I also prefer alternative 1 because it's more "declarative", i.e. the sink has a fixed method where it can return something. Instead of the framework giving an interface to the sink that it can then use in arbitrary ways. Alternative 1 leaves more freedom to the framework but limits the sink. Alternative 2 leaves more freedom to the sink but limits the framework, potentially. Best, Aljoscha |
In reply to this post by Guowei Ma
Sorry if I caused a confusion here. I am not saying the Iceberg sink needs a local committer. What I had in mind is that prior to the Iceberg example I did not see a need for a "GlobalCommitter" in the streaming case. I thought it is always enough to have the "normal" committer in that case. Now I understand that this differentiation is not really about logical separation. It is not really about the granularity with which we commit, i.e. answering the "WHAT" question. It is really about the performance and that in the end we will have a single "transaction", so it is about answering the question "HOW".What I understand is that HiveSink's implementation might need the local committer(FileCommitter) because the file rename is needed. But the iceberg only needs to write the manifest file. Would you like to enlighten me why the Iceberg needs the local committer? Thanks
I still find the merging case the most confusing. I don't necessarily understand why do you need the "SingleFileCommit" step in this scenario. The way I understand "commit" operation is that it makes some data/artifacts visible to the external system, thus it should be immutable from a point of view of a single process. Having an additional step in the same process that works on committed data contradicts with those assumptions. I might be missing something though. Could you elaborate why can't it be something like FileWriter -> FileMergeWriter -> Committer (either global or non-global)? Again it might be just me not getting the example.
I've just briefly skimmed over the proposed interfaces. I would suggest oneBTW Let's not forget about Piotr's comment. I think we could add the isAvailable or similar method to the Writer interface in the FLIP. Best, Dawid On 15/09/2020 08:06, Guowei Ma wrote:
I would think that we only need flush() and the semantics are that it prepares for a commit, so on a physical level it would be called from "prepareSnapshotPreBarrier". Now that I'm thinking about it more I think flush() should be renamed to something like "prepareCommit()".Generally speaking it is a good point that emitting the committables should happen before emitting the checkpoint barrier downstream. However, if I remember offline discussions well, the idea behind Writer#flush and Writer#snapshotState was to differentiate commit on checkpoint vs final checkpoint at the end of the job. Both of these methods could emit committables, but the flush should not leave any in progress state (e.g. in case of file sink in STREAM mode, in snapshotState it could leave some open files that would be committed in a subsequent cycle, however flush should close all files). The snapshotState as it is now can not be called in prepareSnapshotPreBarrier as it can store some state, which should happen in Operator#snapshotState as otherwise it would always be synchronous. Therefore I think we would need sth like:void prepareCommit(boolean flush, WriterOutput<CommT> output);ver 1:List<StateT> snapshotState();ver 2:void snapshotState(); // not sure if we need that method at all in option2 I second Dawid's proposal. This is a valid scenario. And version2 does not need the snapshotState() any more.The Committer is as described in the FLIP, it's basically a function "void commit(Committable)". The GobalCommitter would be a function "void commit(List<Committable>)". The former would be used by an S3 sink where we can individually commit files to S3, a committable would be the list of part uploads that will form the final file and the commit operation creates the metadata in S3. The latter would be used by something like Iceberg where the Committer needs a global view of all the commits to be efficient and not overwhelm the system. I don't know yet if sinks would only implement on type of commit function or potentially both at the same time, and maybe Commit can return some CommitResult that gets shipped to the GlobalCommit function. I must admit it I did not get the need for Local/Normal + Global committer at first. The Iceberg example helped a lot. I think it makes a lot of sense.@Dawid What I understand is that HiveSink's implementation might need the local committer(FileCommitter) because the file rename is needed. But the iceberg only needs to write the manifest file. Would you like to enlighten me why the Iceberg needs the local committer? Thanks Best, Guowei On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz [hidden email] wrote:Hi all,I would think that we only need flush() and the semantics are that it prepares for a commit, so on a physical level it would be called from "prepareSnapshotPreBarrier". Now that I'm thinking about it more I think flush() should be renamed to something like "prepareCommit()".Generally speaking it is a good point that emitting the committables should happen before emitting the checkpoint barrier downstream. However, if I remember offline discussions well, the idea behind Writer#flush and Writer#snapshotState was to differentiate commit on checkpoint vs final checkpoint at the end of the job. Both of these methods could emit committables, but the flush should not leave any in progress state (e.g. in case of file sink in STREAM mode, in snapshotState it could leave some open files that would be committed in a subsequent cycle, however flush should close all files). The snapshotState as it is now can not be called in prepareSnapshotPreBarrier as it can store some state, which should happen in Operator#snapshotState as otherwise it would always be synchronous. Therefore I think we would need sth like: void prepareCommit(boolean flush, WriterOutput<CommT> output); ver 1: List<StateT> snapshotState(); ver 2: void snapshotState(); // not sure if we need that method at all in option 2The Committer is as described in the FLIP, it's basically a function "void commit(Committable)". The GobalCommitter would be a function "void commit(List<Committable>)". The former would be used by an S3 sink where we can individually commit files to S3, a committable would be the list of part uploads that will form the final file and the commit operation creates the metadata in S3. The latter would be used by something like Iceberg where the Committer needs a global view of all the commits to be efficient and not overwhelm the system. I don't know yet if sinks would only implement on type of commit function or potentially both at the same time, and maybe Commit can return some CommitResult that gets shipped to the GlobalCommit function.I must admit it I did not get the need for Local/Normal + Global committer at first. The Iceberg example helped a lot. I think it makes a lot of sense.For Iceberg, writers don't need any state. But the GlobalCommitter needs to checkpoint StateT. For the committer, CommT is "DataFile". Since a single committer can collect thousands (or more) data files in one checkpoint cycle, as an optimization we checkpoint a single "ManifestFile" (for the collected thousands data files) as StateT. This allows us to absorb extended commit outages without losing written/uploaded data files, as operator state size is as small as one manifest file per checkpoint cycle [2]. ------------------ StateT snapshotState(SnapshotContext context) throws Exception; That means we also need the restoreCommitter API in the Sink interface --------------- Committer<CommT, StateT> restoreCommitter(InitContext context, StateT state);I think this might be a valid case. Not sure though if I would go with a "state" there. Having a state in a committer would imply we need a collect method as well. So far we needed a single method commit(...) and the bookkeeping of the committables could be handled by the framework. I think something like an optional combiner in the GlobalCommitter would be enough. What do you think? GlobalCommitter<CommT, GlobalCommT> { void commit(GlobalCommT globalCommittables); GlobalCommT combine(List<CommT> committables); } A different problem that I see here is how do we handle commit failures. Should the committables (both normal and global be included in the next cycle, shall we retry it, ...) I think it would be worth laying it out in the FLIP. @Aljoscha I think you can find the code Steven was referring in here: https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java Best, Dawid On 14/09/2020 15:19, Aljoscha Krettek wrote:On 14.09.20 01:23, Steven Wu wrote:## Writer interface For the Writer interface, should we add "*prepareSnapshot"* before the checkpoint barrier emitted downstream? IcebergWriter would need it. Or would the framework call "*flush*" before the barrier emitted downstream? that guarantee would achieve the same goal.I would think that we only need flush() and the semantics are that it prepares for a commit, so on a physical level it would be called from "prepareSnapshotPreBarrier". Now that I'm thinking about it more I think flush() should be renamed to something like "prepareCommit()". @Guowei, what do you think about this?In [1], we discussed the reason for Writer to emit (checkpointId, CommT) tuple to the committer. The committer needs checkpointId to separate out data files for different checkpoints if concurrent checkpoints are enabled.When can this happen? Even with concurrent checkpoints the snapshot barriers would still cleanly segregate the input stream of an operator into tranches that should manifest in only one checkpoint. With concurrent checkpoints, all that can happen is that we start a checkpoint before a last one is confirmed completed. Unless there is some weirdness in the sources and some sources start chk1 first and some other ones start chk2 first? @Piotrek, do you think this is a problem?For the Committer interface, I am wondering if we should split the single commit method into separate "*collect"* and "*commit"* methods? This way, it can handle both single and multiple CommT objects.I think we can't do this. If the sink only needs a regular Commiter, we can perform the commits in parallel, possibly on different machines. Only when the sink needs a GlobalCommitter would we need to ship all commits to a single process and perform the commit there. If both methods were unified in one interface we couldn't make the decision of were to commit in the framework code.For Iceberg, writers don't need any state. But the GlobalCommitter needs to checkpoint StateT. For the committer, CommT is "DataFile". Since a single committer can collect thousands (or more) data files in one checkpoint cycle, as an optimization we checkpoint a single "ManifestFile" (for the collected thousands data files) as StateT. This allows us to absorb extended commit outages without losing written/uploaded data files, as operator state size is as small as one manifest file per checkpoint cycleYou could have a point here. Is the code for this available in open-source? I was checking outhttps://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.javaand didn't find the ManifestFile optimization there. Best, Aljoscha signature.asc (849 bytes) Download Attachment |
On 15.09.20 09:55, Dawid Wysakowicz wrote:
> BTW Let's not forget about Piotr's comment. I think we could add the > isAvailable or similar method to the Writer interface in the FLIP. I'm not so sure about this, the sinks I'm aware of would not be able to implement this method: Kafka doesn't have this, I didn't see it in the Iceberg interfaces, and HDFS/S3 also don't have it. I can see why from a systems perspective it would be useful to have, of course. Best, Aljoscha |
In reply to this post by dwysakowicz
Hi, Dawid
>>I still find the merging case the most confusing. I don't necessarily understand why do you need the "SingleFileCommit" step in this scenario. The way I >> understand "commit" operation is that it makes some data/artifacts visible to the external system, thus it should be immutable from a point of view of a single >>process. Having an additional step in the same process that works on committed data contradicts with those assumptions. I might be missing something though. >> Could you elaborate >why can't it be something like FileWriter -> FileMergeWriter -> Committer (either global or non-global)? Again it might be just me not getting the example. I think you are right. The topology "FileWriter->FileMergeWriter->Committer" could meet the merge requirement. The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> GlobalCommitter" reuses some code of the StreamingFileSink(For example rolling policy) so it has the "SingleFileCommitter" in the topology. In general I want to use the case to show that there are different topologies according to the requirements. BTW: IIRC, @Jingsong Lee <[hidden email]> telled me that the actual topology of merged supported HiveSink is more complicated than that. >> I've just briefly skimmed over the proposed interfaces. I would suggest one >> addition to the Writer interface (as I understand this is the runtime >> interface in this proposal?): add some availability method, to avoid, if >> possible, blocking calls on the sink. We already have similar >> availability methods in the new sources [1] and in various places in the >> network stack [2]. >> BTW Let's not forget about Piotr's comment. I think we could add the isAvailable or similar method to the Writer interface in the FLIP. Thanks @Dawid Wysakowicz <[hidden email]> for your reminder. There are two many issues at the same time. In addition to what Ajjoscha said : there is very little system support it. Another thing I worry about is that: Does the sink's snapshot return immediately when the sink's status is unavailable? Maybe we could do it by dedupe some element in the state but I think it might be too complicated. For me I want to know is what specific sink will benefit from this feature. @piotr <[hidden email]> Please correct me if I misunderstand you. thanks. Best, Guowei On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <[hidden email]> wrote: > What I understand is that HiveSink's implementation might need the local > committer(FileCommitter) because the file rename is needed. > But the iceberg only needs to write the manifest file. Would you like to > enlighten me why the Iceberg needs the local committer? > Thanks > > Sorry if I caused a confusion here. I am not saying the Iceberg sink needs > a local committer. What I had in mind is that prior to the Iceberg example > I did not see a need for a "GlobalCommitter" in the streaming case. I > thought it is always enough to have the "normal" committer in that case. > Now I understand that this differentiation is not really about logical > separation. It is not really about the granularity with which we commit, > i.e. answering the "WHAT" question. It is really about the performance and > that in the end we will have a single "transaction", so it is about > answering the question "HOW". > > > - > > Commit a directory with merged files(Some user want to merge the files > in a directory before committing the directory to Hive meta store) > > > 1. > > FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter > > I still find the merging case the most confusing. I don't necessarily > understand why do you need the "SingleFileCommit" step in this scenario. > The way I understand "commit" operation is that it makes some > data/artifacts visible to the external system, thus it should be immutable > from a point of view of a single process. Having an additional step in the > same process that works on committed data contradicts with those > assumptions. I might be missing something though. Could you elaborate why > can't it be something like FileWriter -> FileMergeWriter -> Committer > (either global or non-global)? Again it might be just me not getting the > example. > > I've just briefly skimmed over the proposed interfaces. I would suggest one > addition to the Writer interface (as I understand this is the runtime > interface in this proposal?): add some availability method, to avoid, if > possible, blocking calls on the sink. We already have similar > availability methods in the new sources [1] and in various places in the > network stack [2]. > > BTW Let's not forget about Piotr's comment. I think we could add the > isAvailable or similar method to the Writer interface in the FLIP. > > Best, > > Dawid > On 15/09/2020 08:06, Guowei Ma wrote: > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option > > 2 > > I second Dawid's proposal. This is a valid scenario. And version2 does not > need the snapshotState() any more. > > > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > @Dawid > What I understand is that HiveSink's implementation might need the local > committer(FileCommitter) because the file rename is needed. > But the iceberg only needs to write the manifest file. Would you like to > enlighten me why the Iceberg needs the local committer? > Thanks > > Best, > Guowei > > > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <[hidden email]> <[hidden email]> > wrote: > > > Hi all, > > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option 2 > > > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. > > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint cycle > [2]. > ------------------ > StateT snapshotState(SnapshotContext context) throws Exception; > > That means we also need the restoreCommitter API in the Sink interface > --------------- > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > state); > > I think this might be a valid case. Not sure though if I would go with a > "state" there. Having a state in a committer would imply we need a > collect method as well. So far we needed a single method commit(...) and > the bookkeeping of the committables could be handled by the framework. I > think something like an optional combiner in the GlobalCommitter would > be enough. What do you think? > > GlobalCommitter<CommT, GlobalCommT> { > > void commit(GlobalCommT globalCommittables); > > GlobalCommT combine(List<CommT> committables); > > } > > A different problem that I see here is how do we handle commit failures. > Should the committables (both normal and global be included in the next > cycle, shall we retry it, ...) I think it would be worth laying it out > in the FLIP. > > @Aljoscha I think you can find the code Steven was referring in here: > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > Best, > > Dawid > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > On 14.09.20 01:23, Steven Wu wrote: > > ## Writer interface > > For the Writer interface, should we add "*prepareSnapshot"* before the > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > would the framework call "*flush*" before the barrier emitted > downstream? > that guarantee would achieve the same goal. > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > @Guowei, what do you think about this? > > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > tuple to the committer. The committer needs checkpointId to separate out > data files for different checkpoints if concurrent checkpoints are > enabled. > > When can this happen? Even with concurrent checkpoints the snapshot > barriers would still cleanly segregate the input stream of an operator > into tranches that should manifest in only one checkpoint. With > concurrent checkpoints, all that can happen is that we start a > checkpoint before a last one is confirmed completed. > > Unless there is some weirdness in the sources and some sources start > chk1 first and some other ones start chk2 first? > > @Piotrek, do you think this is a problem? > > > For the Committer interface, I am wondering if we should split the > single > commit method into separate "*collect"* and "*commit"* methods? This > way, > it can handle both single and multiple CommT objects. > > I think we can't do this. If the sink only needs a regular Commiter, > we can perform the commits in parallel, possibly on different > machines. Only when the sink needs a GlobalCommitter would we need to > ship all commits to a single process and perform the commit there. If > both methods were unified in one interface we couldn't make the > decision of were to commit in the framework code. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a > single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint > cycle > > You could have a point here. Is the code for this available in > open-source? I was checking out > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > and didn't find the ManifestFile optimization there. > > Best, > Aljoscha > > > |
> images don't make it through to the mailing lists. You would need to host
the file somewhere and send a link. Sorry about that. Here is the sample DAG in google drawings. https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <[hidden email]> wrote: > Hi, Dawid > > >>I still find the merging case the most confusing. I don't necessarily > understand why do you need the "SingleFileCommit" step in this scenario. > The way I > >> understand "commit" operation is that it makes some data/artifacts > visible to the external system, thus it should be immutable from a point of > view of a single >>process. Having an additional step in the same process > that works on committed data contradicts with those assumptions. I might be > missing something though. >> Could you elaborate >why can't it be something > like FileWriter -> FileMergeWriter -> Committer (either global or > non-global)? Again it might be just me not getting the example. > > I think you are right. The topology > "FileWriter->FileMergeWriter->Committer" could meet the merge requirement. > The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> > GlobalCommitter" reuses some code of the StreamingFileSink(For example > rolling policy) so it has the "SingleFileCommitter" in the topology. In > general I want to use the case to show that there are different topologies > according to the requirements. > > BTW: IIRC, @Jingsong Lee <[hidden email]> telled me that the > actual topology of merged supported HiveSink is more complicated than that. > > > >> I've just briefly skimmed over the proposed interfaces. I would suggest > one > >> addition to the Writer interface (as I understand this is the runtime > >> interface in this proposal?): add some availability method, to avoid, if > >> possible, blocking calls on the sink. We already have similar > >> availability methods in the new sources [1] and in various places in the > >> network stack [2]. > >> BTW Let's not forget about Piotr's comment. I think we could add the > isAvailable or similar method to the Writer interface in the FLIP. > > Thanks @Dawid Wysakowicz <[hidden email]> for your reminder. > There > are two many issues at the same time. > > In addition to what Ajjoscha said : there is very little system support > it. Another thing I worry about is that: Does the sink's snapshot return > immediately when the sink's status is unavailable? Maybe we could do it by > dedupe some element in the state but I think it might be too complicated. > For me I want to know is what specific sink will benefit from this > feature. @piotr <[hidden email]> Please correct me if I > misunderstand you. thanks. > > Best, > Guowei > > > On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <[hidden email]> > wrote: > > > What I understand is that HiveSink's implementation might need the local > > committer(FileCommitter) because the file rename is needed. > > But the iceberg only needs to write the manifest file. Would you like to > > enlighten me why the Iceberg needs the local committer? > > Thanks > > > > Sorry if I caused a confusion here. I am not saying the Iceberg sink > needs > > a local committer. What I had in mind is that prior to the Iceberg > example > > I did not see a need for a "GlobalCommitter" in the streaming case. I > > thought it is always enough to have the "normal" committer in that case. > > Now I understand that this differentiation is not really about logical > > separation. It is not really about the granularity with which we commit, > > i.e. answering the "WHAT" question. It is really about the performance > and > > that in the end we will have a single "transaction", so it is about > > answering the question "HOW". > > > > > > - > > > > Commit a directory with merged files(Some user want to merge the files > > in a directory before committing the directory to Hive meta store) > > > > > > 1. > > > > FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter > > > > I still find the merging case the most confusing. I don't necessarily > > understand why do you need the "SingleFileCommit" step in this scenario. > > The way I understand "commit" operation is that it makes some > > data/artifacts visible to the external system, thus it should be > immutable > > from a point of view of a single process. Having an additional step in > the > > same process that works on committed data contradicts with those > > assumptions. I might be missing something though. Could you elaborate why > > can't it be something like FileWriter -> FileMergeWriter -> Committer > > (either global or non-global)? Again it might be just me not getting the > > example. > > > > I've just briefly skimmed over the proposed interfaces. I would suggest > one > > addition to the Writer interface (as I understand this is the runtime > > interface in this proposal?): add some availability method, to avoid, if > > possible, blocking calls on the sink. We already have similar > > availability methods in the new sources [1] and in various places in the > > network stack [2]. > > > > BTW Let's not forget about Piotr's comment. I think we could add the > > isAvailable or similar method to the Writer interface in the FLIP. > > > > Best, > > > > Dawid > > On 15/09/2020 08:06, Guowei Ma wrote: > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > Generally speaking it is a good point that emitting the committables > > should happen before emitting the checkpoint barrier downstream. > > However, if I remember offline discussions well, the idea behind > > Writer#flush and Writer#snapshotState was to differentiate commit on > > checkpoint vs final checkpoint at the end of the job. Both of these > > methods could emit committables, but the flush should not leave any in > > progress state (e.g. in case of file sink in STREAM mode, in > > snapshotState it could leave some open files that would be committed in > > a subsequent cycle, however flush should close all files). The > > snapshotState as it is now can not be called in > > prepareSnapshotPreBarrier as it can store some state, which should > > happen in Operator#snapshotState as otherwise it would always be > > synchronous. Therefore I think we would need sth like: > > > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > > > ver 1: > > > > List<StateT> snapshotState(); > > > > ver 2: > > > > void snapshotState(); // not sure if we need that method at all in option > > > > 2 > > > > I second Dawid's proposal. This is a valid scenario. And version2 does > not > > need the snapshotState() any more. > > > > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > > I must admit it I did not get the need for Local/Normal + Global > > committer at first. The Iceberg example helped a lot. I think it makes a > > lot of sense. > > > > @Dawid > > What I understand is that HiveSink's implementation might need the local > > committer(FileCommitter) because the file rename is needed. > > But the iceberg only needs to write the manifest file. Would you like to > > enlighten me why the Iceberg needs the local committer? > > Thanks > > > > Best, > > Guowei > > > > > > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < > [hidden email]> <[hidden email]> > > wrote: > > > > > > Hi all, > > > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > Generally speaking it is a good point that emitting the committables > > should happen before emitting the checkpoint barrier downstream. > > However, if I remember offline discussions well, the idea behind > > Writer#flush and Writer#snapshotState was to differentiate commit on > > checkpoint vs final checkpoint at the end of the job. Both of these > > methods could emit committables, but the flush should not leave any in > > progress state (e.g. in case of file sink in STREAM mode, in > > snapshotState it could leave some open files that would be committed in > > a subsequent cycle, however flush should close all files). The > > snapshotState as it is now can not be called in > > prepareSnapshotPreBarrier as it can store some state, which should > > happen in Operator#snapshotState as otherwise it would always be > > synchronous. Therefore I think we would need sth like: > > > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > > > ver 1: > > > > List<StateT> snapshotState(); > > > > ver 2: > > > > void snapshotState(); // not sure if we need that method at all in > option 2 > > > > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > > > > I must admit it I did not get the need for Local/Normal + Global > > committer at first. The Iceberg example helped a lot. I think it makes a > > lot of sense. > > > > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint cycle > > [2]. > > ------------------ > > StateT snapshotState(SnapshotContext context) throws Exception; > > > > That means we also need the restoreCommitter API in the Sink interface > > --------------- > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > > state); > > > > I think this might be a valid case. Not sure though if I would go with a > > "state" there. Having a state in a committer would imply we need a > > collect method as well. So far we needed a single method commit(...) and > > the bookkeeping of the committables could be handled by the framework. I > > think something like an optional combiner in the GlobalCommitter would > > be enough. What do you think? > > > > GlobalCommitter<CommT, GlobalCommT> { > > > > void commit(GlobalCommT globalCommittables); > > > > GlobalCommT combine(List<CommT> committables); > > > > } > > > > A different problem that I see here is how do we handle commit failures. > > Should the committables (both normal and global be included in the next > > cycle, shall we retry it, ...) I think it would be worth laying it out > > in the FLIP. > > > > @Aljoscha I think you can find the code Steven was referring in here: > > > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > > > Best, > > > > Dawid > > > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > > > On 14.09.20 01:23, Steven Wu wrote: > > > > ## Writer interface > > > > For the Writer interface, should we add "*prepareSnapshot"* before the > > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > > would the framework call "*flush*" before the barrier emitted > > downstream? > > that guarantee would achieve the same goal. > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > @Guowei, what do you think about this? > > > > > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > > tuple to the committer. The committer needs checkpointId to separate out > > data files for different checkpoints if concurrent checkpoints are > > enabled. > > > > When can this happen? Even with concurrent checkpoints the snapshot > > barriers would still cleanly segregate the input stream of an operator > > into tranches that should manifest in only one checkpoint. With > > concurrent checkpoints, all that can happen is that we start a > > checkpoint before a last one is confirmed completed. > > > > Unless there is some weirdness in the sources and some sources start > > chk1 first and some other ones start chk2 first? > > > > @Piotrek, do you think this is a problem? > > > > > > For the Committer interface, I am wondering if we should split the > > single > > commit method into separate "*collect"* and "*commit"* methods? This > > way, > > it can handle both single and multiple CommT objects. > > > > I think we can't do this. If the sink only needs a regular Commiter, > > we can perform the commits in parallel, possibly on different > > machines. Only when the sink needs a GlobalCommitter would we need to > > ship all commits to a single process and perform the commit there. If > > both methods were unified in one interface we couldn't make the > > decision of were to commit in the framework code. > > > > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a > > single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint > > cycle > > > > You could have a point here. Is the code for this available in > > open-source? I was checking out > > > > > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > > > and didn't find the ManifestFile optimization there. > > > > Best, > > Aljoscha > > > > > > > |
Free forum by Nabble | Edit this page |