Hi devs,
I'd like to start a discussion of FLIP-151: Incremental snapshots for heap-based state backend [1] Heap backend, while being limited state sizes fitting into memory, also has some advantages compared to RocksDB backend: 1. Serialization once per checkpoint, not per state modification. This allows to “squash” updates to the same keys 2. Shorter synchronous phase (compared to RocksDB incremental) 3. No need for sorting and compaction, no IO amplification and JNI overhead This can potentially give higher throughput and efficiency. However, Heap backend currently lacks incremental checkpoints. This FLIP aims to add initial support for them. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend Any feedback highly appreciated. Regards, Roman |
Hi,
Very happy to see that the incremental checkpoint idea is finally becoming a reality for the heap backend! Overall the proposal looks pretty good to me. Just wanted to point out one possible improvement from what I can still remember from my ideas back then: I think you can avoid doing periodic full snapshots for consolidation. Instead, my suggestion would be to track the version numbers you encounter while you iterate a snapshot for writing it - and then you should be able to prune all incremental snapshots that were performed with a version number smaller than the minimum you find. To avoid the problem of very old entries that never get modified you could start spilling entries with a certain age-difference compared to the current map version so that eventually all entries for an old version are re-written to newer snapshots. You can track the version up to which this was done in the map and then you can again let go of their corresponding snapshots after a guaranteed time.So instead of having the burden of periodic large snapshots, you can make every snapshot work a little bit on the cleanup and if you are lucky it might happen mostly by itself if most entries are frequently updated. I would also consider to make map clean a special event in your log and consider unticking the versions on this event - this allows you to let go of old snapshots and saves you from writing a log of antimatter entries. Maybe the ideas are still useful to you. Best, Stefan On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: > Hi devs,> > > I'd like to start a discussion of FLIP-151: Incremental snapshots for> > heap-based state backend [1]> > > Heap backend, while being limited state sizes fitting into memory, also has> > some advantages compared to RocksDB backend:> > 1. Serialization once per checkpoint, not per state modification. This> > allows to “squash” updates to the same keys> > 2. Shorter synchronous phase (compared to RocksDB incremental)> > 3. No need for sorting and compaction, no IO amplification and JNI overhead> > This can potentially give higher throughput and efficiency.> > > However, Heap backend currently lacks incremental checkpoints. This FLIP> > aims to add initial support for them.> > > [1]> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend> > > > Any feedback highly appreciated.> > > Regards,> > Roman> > |
Hi Stefan,
Thanks for your reply. Very interesting ideas! If I understand correctly, SharedStateRegistry will still be responsible for pruning the old state; for that, it will maintain some (ordered) mapping between StateMaps and their versions, per key group. I think one modification to this approach is needed to support journaling: for each entry, maintain a version when it was last fully snapshotted; and use this version to find the minimum as you described above. I'm considering a better state cleanup and optimization of removals as the next step. Anyway, I will add it to the FLIP document. Thanks! Regards, Roman On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <[hidden email]> wrote: > Hi, > > Very happy to see that the incremental checkpoint idea is finally becoming > a reality for the heap backend! Overall the proposal looks pretty good to > me. Just wanted to point out one possible improvement from what I can still > remember from my ideas back then: I think you can avoid doing periodic full > snapshots for consolidation. Instead, my suggestion would be to track the > version numbers you encounter while you iterate a snapshot for writing it - > and then you should be able to prune all incremental snapshots that were > performed with a version number smaller than the minimum you find. To avoid > the problem of very old entries that never get modified you could start > spilling entries with a certain age-difference compared to the current map > version so that eventually all entries for an old version are re-written to > newer snapshots. You can track the version up to which this was done in the > map and then you can again let go of their corresponding snapshots after a > guaranteed time.So instead of having the burden of periodic large > snapshots, you can make every snapshot work a little bit on the cleanup and > if you are lucky it might happen mostly by itself if most entries are > frequently updated. I would also consider to make map clean a special event > in your log and consider unticking the versions on this event - this allows > you to let go of old snapshots and saves you from writing a log of > antimatter entries. Maybe the ideas are still useful to you. > > Best, > Stefan > > On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: > > Hi devs,> > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots for> > > heap-based state backend [1]> > > > > Heap backend, while being limited state sizes fitting into memory, also > has> > > some advantages compared to RocksDB backend:> > > 1. Serialization once per checkpoint, not per state modification. This> > > allows to “squash” updates to the same keys> > > 2. Shorter synchronous phase (compared to RocksDB incremental)> > > 3. No need for sorting and compaction, no IO amplification and JNI > overhead> > > This can potentially give higher throughput and efficiency.> > > > > However, Heap backend currently lacks incremental checkpoints. This > FLIP> > > aims to add initial support for them.> > > > > [1]> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend> > > > > > > > Any feedback highly appreciated.> > > > > Regards,> > > Roman> > > |
Thanks, Roman for publishing this design.
There seems to be quite a bit of overlap with FLIP-158 (generalized incremental checkpoints). I would go with +1 to the effort if it is a pretty self-contained and closed effort. Meaning we don't expect that this needs a ton of follow-ups, other than common maintenance and small bug fixes. If we expect that this requires a lot of follow-ups, then we end up splitting our work between this FLIP and FLIP-158, which seems a bit inefficient. What other committers would be involved to ensure the community can maintain this? The design looks fine, in general, with one question: When persisting changes, you persist all changes that have a newer version than the latest one confirmed by the JM. Can you explain why it is like that exactly? Alternatively, you could keep the latest checkpoint ID for which the state backend persisted the diff successfully to the checkpoint storage, and created a state handle. For each checkpoint, the state backend includes the state handles of all involved chunks. That would be similar to the log-based approach in FLIP-158. I have a suspicion that this is because the JM may have released the state handle (and discarded the diff) for a checkpoint that succeeded on the task but didn't succeed globally. So we cannot reference any state handle that has been handed over to the JobManager, but is not yet confirmed. This characteristic seems to be at the heart of much of the complexity, also the handling of removed keys seems to be caused by that. If we could change that assumption, the design would become simpler. (Side note: I am wondering if this also impacts the FLIP-158 DSTL design.) Best, Stephan On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman < [hidden email]> wrote: > Hi Stefan, > > Thanks for your reply. Very interesting ideas! > If I understand correctly, SharedStateRegistry will still be responsible > for pruning the old state; for that, it will maintain some (ordered) > mapping between StateMaps and their versions, per key group. > I think one modification to this approach is needed to support journaling: > for each entry, maintain a version when it was last fully snapshotted; and > use this version to find the minimum as you described above. > I'm considering a better state cleanup and optimization of removals as the > next step. Anyway, I will add it to the FLIP document. > > Thanks! > > Regards, > Roman > > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <[hidden email] > > > wrote: > > > Hi, > > > > Very happy to see that the incremental checkpoint idea is finally > becoming > > a reality for the heap backend! Overall the proposal looks pretty good to > > me. Just wanted to point out one possible improvement from what I can > still > > remember from my ideas back then: I think you can avoid doing periodic > full > > snapshots for consolidation. Instead, my suggestion would be to track the > > version numbers you encounter while you iterate a snapshot for writing > it - > > and then you should be able to prune all incremental snapshots that were > > performed with a version number smaller than the minimum you find. To > avoid > > the problem of very old entries that never get modified you could start > > spilling entries with a certain age-difference compared to the current > map > > version so that eventually all entries for an old version are re-written > to > > newer snapshots. You can track the version up to which this was done in > the > > map and then you can again let go of their corresponding snapshots after > a > > guaranteed time.So instead of having the burden of periodic large > > snapshots, you can make every snapshot work a little bit on the cleanup > and > > if you are lucky it might happen mostly by itself if most entries are > > frequently updated. I would also consider to make map clean a special > event > > in your log and consider unticking the versions on this event - this > allows > > you to let go of old snapshots and saves you from writing a log of > > antimatter entries. Maybe the ideas are still useful to you. > > > > Best, > > Stefan > > > > On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: > > > Hi devs,> > > > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots for> > > > heap-based state backend [1]> > > > > > > Heap backend, while being limited state sizes fitting into memory, also > > has> > > > some advantages compared to RocksDB backend:> > > > 1. Serialization once per checkpoint, not per state modification. This> > > > allows to “squash” updates to the same keys> > > > 2. Shorter synchronous phase (compared to RocksDB incremental)> > > > 3. No need for sorting and compaction, no IO amplification and JNI > > overhead> > > > This can potentially give higher throughput and efficiency.> > > > > > > However, Heap backend currently lacks incremental checkpoints. This > > FLIP> > > > aims to add initial support for them.> > > > > > > [1]> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend > > > > > > > > > > > > > Any feedback highly appreciated.> > > > > > > Regards,> > > > Roman> > > > > |
Thanks for your reply Stephan.
Yes, there is overlap between FLIP-151 and FLIP-158 as both address incremental state updates. However, I think that FLIP-151 on top of FLIP-158 increases efficiency by: 1. "Squashing" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158) 2. Keeping in memory only the changed keys and not the values. (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158) (1) can probably be implemented in FLIP-158, but not (2). I don't think there will be a lot of follow-up efforts and I hope @Dawid Wysakowicz <[hidden email]>, @pnowojski <[hidden email]> , Yuan Mei and probably @Yu Li <[hidden email]> will be able to join at different stages. Regarding using only the confirmed checkpoints, you are right: JM can abort non-confirmed checkpoints and discard the state. FLIP-158 has the same problem because StateChangelog produces StateHandles that can be discarded by the JM. Currently, potentially discarded changes are re-uploaded in both FLIPs. In FLIP-158 (or follow-up), I planned to improve this part by: 1. Limiting max-concurrent-checkpoints to 1, and 2. Sending the last confirmed checkpoint ID in RPCs and barriers So at the time of checkpoint, backend knows exactly which changes can be included. Handling of removed keys is not related to the aborted checkpoints. They are needed on recovery to actually remove data from the previous snapshot. In FLIP-158 it is again similar: ChangelogStateBackend has to encode removal operations and send them to StateChangelog (though no additional data structure is required). Regards, Roman On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <[hidden email]> wrote: > Thanks, Roman for publishing this design. > > There seems to be quite a bit of overlap with FLIP-158 (generalized > incremental checkpoints). > > I would go with +1 to the effort if it is a pretty self-contained and > closed effort. Meaning we don't expect that this needs a ton of follow-ups, > other than common maintenance and small bug fixes. If we expect that this > requires a lot of follow-ups, then we end up splitting our work between > this FLIP and FLIP-158, which seems a bit inefficient. > What other committers would be involved to ensure the community can > maintain this? > > > The design looks fine, in general, with one question: > > When persisting changes, you persist all changes that have a newer version > than the latest one confirmed by the JM. > > Can you explain why it is like that exactly? Alternatively, you could keep > the latest checkpoint ID for which the state backend persisted the diff > successfully to the checkpoint storage, and created a state handle. For > each checkpoint, the state backend includes the state handles of all > involved chunks. That would be similar to the log-based approach in > FLIP-158. > > I have a suspicion that this is because the JM may have released the state > handle (and discarded the diff) for a checkpoint that succeeded on the task > but didn't succeed globally. So we cannot reference any state handle that > has been handed over to the JobManager, but is not yet confirmed. > > This characteristic seems to be at the heart of much of the complexity, > also the handling of removed keys seems to be caused by that. > If we could change that assumption, the design would become simpler. > > (Side note: I am wondering if this also impacts the FLIP-158 DSTL design.) > > Best, > Stephan > > > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman < > [hidden email]> wrote: > > > Hi Stefan, > > > > Thanks for your reply. Very interesting ideas! > > If I understand correctly, SharedStateRegistry will still be responsible > > for pruning the old state; for that, it will maintain some (ordered) > > mapping between StateMaps and their versions, per key group. > > I think one modification to this approach is needed to support > journaling: > > for each entry, maintain a version when it was last fully snapshotted; > and > > use this version to find the minimum as you described above. > > I'm considering a better state cleanup and optimization of removals as > the > > next step. Anyway, I will add it to the FLIP document. > > > > Thanks! > > > > Regards, > > Roman > > > > > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter < > [hidden email] > > > > > wrote: > > > > > Hi, > > > > > > Very happy to see that the incremental checkpoint idea is finally > > becoming > > > a reality for the heap backend! Overall the proposal looks pretty good > to > > > me. Just wanted to point out one possible improvement from what I can > > still > > > remember from my ideas back then: I think you can avoid doing periodic > > full > > > snapshots for consolidation. Instead, my suggestion would be to track > the > > > version numbers you encounter while you iterate a snapshot for writing > > it - > > > and then you should be able to prune all incremental snapshots that > were > > > performed with a version number smaller than the minimum you find. To > > avoid > > > the problem of very old entries that never get modified you could start > > > spilling entries with a certain age-difference compared to the current > > map > > > version so that eventually all entries for an old version are > re-written > > to > > > newer snapshots. You can track the version up to which this was done in > > the > > > map and then you can again let go of their corresponding snapshots > after > > a > > > guaranteed time.So instead of having the burden of periodic large > > > snapshots, you can make every snapshot work a little bit on the cleanup > > and > > > if you are lucky it might happen mostly by itself if most entries are > > > frequently updated. I would also consider to make map clean a special > > event > > > in your log and consider unticking the versions on this event - this > > allows > > > you to let go of old snapshots and saves you from writing a log of > > > antimatter entries. Maybe the ideas are still useful to you. > > > > > > Best, > > > Stefan > > > > > > On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: > > > > Hi devs,> > > > > > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots > for> > > > > heap-based state backend [1]> > > > > > > > > Heap backend, while being limited state sizes fitting into memory, > also > > > has> > > > > some advantages compared to RocksDB backend:> > > > > 1. Serialization once per checkpoint, not per state modification. > This> > > > > allows to “squash” updates to the same keys> > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)> > > > > 3. No need for sorting and compaction, no IO amplification and JNI > > > overhead> > > > > This can potentially give higher throughput and efficiency.> > > > > > > > > However, Heap backend currently lacks incremental checkpoints. This > > > FLIP> > > > > aims to add initial support for them.> > > > > > > > > [1]> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend > > > > > > > > > > > > > > > > > > Any feedback highly appreciated.> > > > > > > > > Regards,> > > > > Roman> > > > > > > > |
Thanks for clarifying.
Concerning the JM aborted checkpoints and state handles: I was thinking about it the other day as well and was considering an approach like that: The core idea is to move the cleanup from JM to TM. That solves two issues: (1) The StateBackends / DSTL delete the artifacts themselves, meaning we don't have to make assumptions about the state on the JM. That sounds too fragile, with easy bugs as soon as some slight assumptions change (see also bug with incr. checkpoint / savepoint data loss, https://issues.apache.org/jira/browse/FLINK-21351) (2) We do not need to clean up from one node. In the past, doing the cleanup from one node (JM) has sometimes become a bottleneck. To achieve that, we would need to extend the "notifyCheckpointComplete()" RPC from the JM to the TM includes both the ID of the completed checkpoint, and the ID of the earliest retained checkpoint. Then the TM can clean up all artifacts from earlier checkpoints. There are two open questions to that design: (1) On restore, we need to communicate the state handles of the previous checkpoints to the TM as well, so the TM gets again the full picture of all state artifacts. (2) On rescaling, we need to clarify which TM is responsible for releasing a handle, if they are mapped to multiple TMs. Otherwise we get double-delete calls. That isn't per se a problem, it is just a bit less efficient. Maybe we could think in that direction for the DSTL work? On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan <[hidden email]> wrote: > Thanks for your reply Stephan. > > Yes, there is overlap between FLIP-151 and FLIP-158 as both > address incremental state updates. However, I think that FLIP-151 on top > of FLIP-158 increases efficiency by: > > 1. "Squashing" the changes made to the same key. For example, if some > counter was changed 10 times then FLIP-151 will send only the last value > (this allows to send AND store less data compared to FLIP-158) > > 2. Keeping in memory only the changed keys and not the values. > (this allows to reduce memory AND latency (caused by serialization + > copying on every update) compared to FLIP-158) > > (1) can probably be implemented in FLIP-158, but not (2). > > I don't think there will be a lot of follow-up efforts and I hope > @Dawid Wysakowicz <[hidden email]>, @pnowojski > <[hidden email]> , Yuan Mei and probably > @Yu Li <[hidden email]> will be able to join at different stages. > > Regarding using only the confirmed checkpoints, you are right: JM can > abort non-confirmed checkpoints and discard the state. FLIP-158 has > the same problem because StateChangelog produces StateHandles that > can be discarded by the JM. Currently, potentially discarded > changes are re-uploaded in both FLIPs. > > In FLIP-158 (or follow-up), I planned to improve this part by: > 1. Limiting max-concurrent-checkpoints to 1, and > 2. Sending the last confirmed checkpoint ID in RPCs and barriers > So at the time of checkpoint, backend knows exactly which changes can be > included. > > Handling of removed keys is not related to the aborted checkpoints. They > are > needed on recovery to actually remove data from the previous snapshot. > In FLIP-158 it is again similar: ChangelogStateBackend has to encode > removal operations and send them to StateChangelog (though no additional > data structure is required). > > Regards, > Roman > > > On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <[hidden email]> wrote: > > > Thanks, Roman for publishing this design. > > > > There seems to be quite a bit of overlap with FLIP-158 (generalized > > incremental checkpoints). > > > > I would go with +1 to the effort if it is a pretty self-contained and > > closed effort. Meaning we don't expect that this needs a ton of > follow-ups, > > other than common maintenance and small bug fixes. If we expect that this > > requires a lot of follow-ups, then we end up splitting our work between > > this FLIP and FLIP-158, which seems a bit inefficient. > > What other committers would be involved to ensure the community can > > maintain this? > > > > > > The design looks fine, in general, with one question: > > > > When persisting changes, you persist all changes that have a newer > version > > than the latest one confirmed by the JM. > > > > Can you explain why it is like that exactly? Alternatively, you could > keep > > the latest checkpoint ID for which the state backend persisted the diff > > successfully to the checkpoint storage, and created a state handle. For > > each checkpoint, the state backend includes the state handles of all > > involved chunks. That would be similar to the log-based approach in > > FLIP-158. > > > > I have a suspicion that this is because the JM may have released the > state > > handle (and discarded the diff) for a checkpoint that succeeded on the > task > > but didn't succeed globally. So we cannot reference any state handle that > > has been handed over to the JobManager, but is not yet confirmed. > > > > This characteristic seems to be at the heart of much of the complexity, > > also the handling of removed keys seems to be caused by that. > > If we could change that assumption, the design would become simpler. > > > > (Side note: I am wondering if this also impacts the FLIP-158 DSTL > design.) > > > > Best, > > Stephan > > > > > > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman < > > [hidden email]> wrote: > > > > > Hi Stefan, > > > > > > Thanks for your reply. Very interesting ideas! > > > If I understand correctly, SharedStateRegistry will still be > responsible > > > for pruning the old state; for that, it will maintain some (ordered) > > > mapping between StateMaps and their versions, per key group. > > > I think one modification to this approach is needed to support > > journaling: > > > for each entry, maintain a version when it was last fully snapshotted; > > and > > > use this version to find the minimum as you described above. > > > I'm considering a better state cleanup and optimization of removals as > > the > > > next step. Anyway, I will add it to the FLIP document. > > > > > > Thanks! > > > > > > Regards, > > > Roman > > > > > > > > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter < > > [hidden email] > > > > > > > wrote: > > > > > > > Hi, > > > > > > > > Very happy to see that the incremental checkpoint idea is finally > > > becoming > > > > a reality for the heap backend! Overall the proposal looks pretty > good > > to > > > > me. Just wanted to point out one possible improvement from what I can > > > still > > > > remember from my ideas back then: I think you can avoid doing > periodic > > > full > > > > snapshots for consolidation. Instead, my suggestion would be to track > > the > > > > version numbers you encounter while you iterate a snapshot for > writing > > > it - > > > > and then you should be able to prune all incremental snapshots that > > were > > > > performed with a version number smaller than the minimum you find. To > > > avoid > > > > the problem of very old entries that never get modified you could > start > > > > spilling entries with a certain age-difference compared to the > current > > > map > > > > version so that eventually all entries for an old version are > > re-written > > > to > > > > newer snapshots. You can track the version up to which this was done > in > > > the > > > > map and then you can again let go of their corresponding snapshots > > after > > > a > > > > guaranteed time.So instead of having the burden of periodic large > > > > snapshots, you can make every snapshot work a little bit on the > cleanup > > > and > > > > if you are lucky it might happen mostly by itself if most entries are > > > > frequently updated. I would also consider to make map clean a special > > > event > > > > in your log and consider unticking the versions on this event - this > > > allows > > > > you to let go of old snapshots and saves you from writing a log of > > > > antimatter entries. Maybe the ideas are still useful to you. > > > > > > > > Best, > > > > Stefan > > > > > > > > On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: > > > > > Hi devs,> > > > > > > > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots > > for> > > > > > heap-based state backend [1]> > > > > > > > > > > Heap backend, while being limited state sizes fitting into memory, > > also > > > > has> > > > > > some advantages compared to RocksDB backend:> > > > > > 1. Serialization once per checkpoint, not per state modification. > > This> > > > > > allows to “squash” updates to the same keys> > > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)> > > > > > 3. No need for sorting and compaction, no IO amplification and JNI > > > > overhead> > > > > > This can potentially give higher throughput and efficiency.> > > > > > > > > > > However, Heap backend currently lacks incremental checkpoints. This > > > > FLIP> > > > > > aims to add initial support for them.> > > > > > > > > > > [1]> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend > > > > > > > > > > > > > > > > > > > > > > > Any feedback highly appreciated.> > > > > > > > > > > Regards,> > > > > > Roman> > > > > > > > > > > > |
That's an interesting idea.
I guess we can decouple the actual state cleanup delegation from the correctness issues. I don't see any reason why it can't be implemented without changing notifications (for FLIP-158, however, we'll probably have to ask "random" TMs because FLIP-158 adds state sharing also across operators. Thus finding the right TM for state removal can become difficult). I'm not sure about the correctness issue though (FLINK-21351). What happens if, after upscaling, JM asks one TM to discard the state; but the second TM receives the notification with a delay? IIUC, it can refer to a discarded state. I think we can prevent this by sending the earliest retained checkpoint ID in trigger RPC/barriers (instead of notifications). However, with multiple concurrent checkpoints, it still seems not enough, because some other checkpoint can be completed and cause in-use checkpoint to be subsumed. Delegation to TM doesn't solve the problem because of rescaling (and state sharing across operators). I think we can solve it either by limiting max-concurrent-checkpoint to 1; or by "locking" all the checkpoints that can still be in use (i.e. num-retained-checkpoints from the earliest pending checkpoint). For example, with num-retained-checkpoints=3: | completed | pending | | cp0 | cp1 | cp2 | cp3 | cp4 | If cp3 and cp4 both specify cp0 as the earliest retained checkpoint; then cp0 can not be subsumed until cp4 is completed - even if cp3 is. Aborted checkpoints are different in that they are removed from the "tail". TM can't infer from the earliest checkpoint ID whether some later changes could be removed. The solution would be to also add last-completed-checkpoint to notifications/trigger-RPC/barriers. To limit FLIP-158 scope I'd implement the last change and limit max-concurrent-checkpoint to 1. Regards, Roman On Tue, Feb 16, 2021 at 10:00 AM Stephan Ewen <[hidden email]> wrote: > Thanks for clarifying. > > Concerning the JM aborted checkpoints and state handles: I was thinking > about it the other day as well and was considering an approach like that: > > The core idea is to move the cleanup from JM to TM. That solves two issues: > > (1) The StateBackends / DSTL delete the artifacts themselves, meaning we > don't have to make assumptions about the state on the JM. That sounds too > fragile, with easy bugs as soon as some slight assumptions change (see also > bug with incr. checkpoint / savepoint data loss, > https://issues.apache.org/jira/browse/FLINK-21351) > > (2) We do not need to clean up from one node. In the past, doing the > cleanup from one node (JM) has sometimes become a bottleneck. > > To achieve that, we would need to extend the "notifyCheckpointComplete()" > RPC from the JM to the TM includes both the ID of the completed checkpoint, > and the ID of the earliest retained checkpoint. Then the TM can clean up > all artifacts from earlier checkpoints. > > There are two open questions to that design: > (1) On restore, we need to communicate the state handles of the previous > checkpoints to the TM as well, so the TM gets again the full picture of all > state artifacts. > (2) On rescaling, we need to clarify which TM is responsible for releasing > a handle, if they are mapped to multiple TMs. Otherwise we get > double-delete calls. That isn't per se a problem, it is just a bit less > efficient. > > > Maybe we could think in that direction for the DSTL work? > > > > On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan <[hidden email]> > wrote: > >> Thanks for your reply Stephan. >> >> Yes, there is overlap between FLIP-151 and FLIP-158 as both >> address incremental state updates. However, I think that FLIP-151 on top >> of FLIP-158 increases efficiency by: >> >> 1. "Squashing" the changes made to the same key. For example, if some >> counter was changed 10 times then FLIP-151 will send only the last value >> (this allows to send AND store less data compared to FLIP-158) >> >> 2. Keeping in memory only the changed keys and not the values. >> (this allows to reduce memory AND latency (caused by serialization + >> copying on every update) compared to FLIP-158) >> >> (1) can probably be implemented in FLIP-158, but not (2). >> >> I don't think there will be a lot of follow-up efforts and I hope >> @Dawid Wysakowicz <[hidden email]>, @pnowojski >> <[hidden email]> , Yuan Mei and probably >> @Yu Li <[hidden email]> will be able to join at different stages. >> >> Regarding using only the confirmed checkpoints, you are right: JM can >> abort non-confirmed checkpoints and discard the state. FLIP-158 has >> the same problem because StateChangelog produces StateHandles that >> can be discarded by the JM. Currently, potentially discarded >> changes are re-uploaded in both FLIPs. >> >> In FLIP-158 (or follow-up), I planned to improve this part by: >> 1. Limiting max-concurrent-checkpoints to 1, and >> 2. Sending the last confirmed checkpoint ID in RPCs and barriers >> So at the time of checkpoint, backend knows exactly which changes can be >> included. >> >> Handling of removed keys is not related to the aborted checkpoints. They >> are >> needed on recovery to actually remove data from the previous snapshot. >> In FLIP-158 it is again similar: ChangelogStateBackend has to encode >> removal operations and send them to StateChangelog (though no additional >> data structure is required). >> >> Regards, >> Roman >> >> >> On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <[hidden email]> wrote: >> >> > Thanks, Roman for publishing this design. >> > >> > There seems to be quite a bit of overlap with FLIP-158 (generalized >> > incremental checkpoints). >> > >> > I would go with +1 to the effort if it is a pretty self-contained and >> > closed effort. Meaning we don't expect that this needs a ton of >> follow-ups, >> > other than common maintenance and small bug fixes. If we expect that >> this >> > requires a lot of follow-ups, then we end up splitting our work between >> > this FLIP and FLIP-158, which seems a bit inefficient. >> > What other committers would be involved to ensure the community can >> > maintain this? >> > >> > >> > The design looks fine, in general, with one question: >> > >> > When persisting changes, you persist all changes that have a newer >> version >> > than the latest one confirmed by the JM. >> > >> > Can you explain why it is like that exactly? Alternatively, you could >> keep >> > the latest checkpoint ID for which the state backend persisted the diff >> > successfully to the checkpoint storage, and created a state handle. For >> > each checkpoint, the state backend includes the state handles of all >> > involved chunks. That would be similar to the log-based approach in >> > FLIP-158. >> > >> > I have a suspicion that this is because the JM may have released the >> state >> > handle (and discarded the diff) for a checkpoint that succeeded on the >> task >> > but didn't succeed globally. So we cannot reference any state handle >> that >> > has been handed over to the JobManager, but is not yet confirmed. >> > >> > This characteristic seems to be at the heart of much of the complexity, >> > also the handling of removed keys seems to be caused by that. >> > If we could change that assumption, the design would become simpler. >> > >> > (Side note: I am wondering if this also impacts the FLIP-158 DSTL >> design.) >> > >> > Best, >> > Stephan >> > >> > >> > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman < >> > [hidden email]> wrote: >> > >> > > Hi Stefan, >> > > >> > > Thanks for your reply. Very interesting ideas! >> > > If I understand correctly, SharedStateRegistry will still be >> responsible >> > > for pruning the old state; for that, it will maintain some (ordered) >> > > mapping between StateMaps and their versions, per key group. >> > > I think one modification to this approach is needed to support >> > journaling: >> > > for each entry, maintain a version when it was last fully snapshotted; >> > and >> > > use this version to find the minimum as you described above. >> > > I'm considering a better state cleanup and optimization of removals as >> > the >> > > next step. Anyway, I will add it to the FLIP document. >> > > >> > > Thanks! >> > > >> > > Regards, >> > > Roman >> > > >> > > >> > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter < >> > [hidden email] >> > > > >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > Very happy to see that the incremental checkpoint idea is finally >> > > becoming >> > > > a reality for the heap backend! Overall the proposal looks pretty >> good >> > to >> > > > me. Just wanted to point out one possible improvement from what I >> can >> > > still >> > > > remember from my ideas back then: I think you can avoid doing >> periodic >> > > full >> > > > snapshots for consolidation. Instead, my suggestion would be to >> track >> > the >> > > > version numbers you encounter while you iterate a snapshot for >> writing >> > > it - >> > > > and then you should be able to prune all incremental snapshots that >> > were >> > > > performed with a version number smaller than the minimum you find. >> To >> > > avoid >> > > > the problem of very old entries that never get modified you could >> start >> > > > spilling entries with a certain age-difference compared to the >> current >> > > map >> > > > version so that eventually all entries for an old version are >> > re-written >> > > to >> > > > newer snapshots. You can track the version up to which this was >> done in >> > > the >> > > > map and then you can again let go of their corresponding snapshots >> > after >> > > a >> > > > guaranteed time.So instead of having the burden of periodic large >> > > > snapshots, you can make every snapshot work a little bit on the >> cleanup >> > > and >> > > > if you are lucky it might happen mostly by itself if most entries >> are >> > > > frequently updated. I would also consider to make map clean a >> special >> > > event >> > > > in your log and consider unticking the versions on this event - this >> > > allows >> > > > you to let go of old snapshots and saves you from writing a log of >> > > > antimatter entries. Maybe the ideas are still useful to you. >> > > > >> > > > Best, >> > > > Stefan >> > > > >> > > > On 2020/11/04 01:54:25, Khachatryan Roman <[hidden email]> wrote: >> > > > > Hi devs,> >> > > > > >> > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots >> > for> >> > > > > heap-based state backend [1]> >> > > > > >> > > > > Heap backend, while being limited state sizes fitting into memory, >> > also >> > > > has> >> > > > > some advantages compared to RocksDB backend:> >> > > > > 1. Serialization once per checkpoint, not per state modification. >> > This> >> > > > > allows to “squash” updates to the same keys> >> > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)> >> > > > > 3. No need for sorting and compaction, no IO amplification and JNI >> > > > overhead> >> > > > > This can potentially give higher throughput and efficiency.> >> > > > > >> > > > > However, Heap backend currently lacks incremental checkpoints. >> This >> > > > FLIP> >> > > > > aims to add initial support for them.> >> > > > > >> > > > > [1]> >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend >> > > > >> > > > >> > > > > >> > > > > >> > > > > Any feedback highly appreciated.> >> > > > > >> > > > > Regards,> >> > > > > Roman> >> > > > > >> > > >> > >> > |
Free forum by Nabble | Edit this page |