I was looking into the handling of state in streaming operators, and it is
a bit hidden from the system Right now, functions can (of they want) put some state into their context. At runtime, state may occur or not. Before runtime, the system cannot tell which operators are going to be stateful, and which are going to be stateless. I think it is a good idea to expose that. We can use that for optimizations and we know which operators need to checkpoint state and acknowledge the asynchronous checkpoint. At this point, we need to assume that all operators need to send a confirmation message, which is unnecessary. Also, I think we should expose which operations want a "commit" notification after the checkpoint completed. Good examples are - the KafkaConsumer source, which can then commit the offset that is safe to zookeeper - a transactional KafkaProduce sink, which can commit a batch of messages to the kafka partition once the checkpoint is done (to get exactly once guarantees that include the sink) Comments welcome! Greetings, Stephan |
I agree with all suggestions, thanks for summing it up Stephan.
A few more points I have in mind at the moment: - Regarding the acknowledgements, indeed we don’t need to make all operators commit back, we just have to make sure that all sinks have acknowledged a checkpoint to consider it complete back at the coordinator. - Do you think we should broadcast commit responses to sources that need it after every successful checkpoint? The checkpoint interval does not always match with the frequency we want to initiate a compaction for example on Kafka. One alternative would be to make sources request a successful checkpoint id via a future on demand. - We have to update the current checkpointing approach to cover iterative streams. We need to make sure we don’t send checkpoint requests to iteration heads and handle downstream backup for records in transit during checkpoints accordingly. cheers Paris > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > I was looking into the handling of state in streaming operators, and it is > a bit hidden from the system > > Right now, functions can (of they want) put some state into their context. > At runtime, state may occur or not. Before runtime, the system cannot tell > which operators are going to be stateful, and which are going to be > stateless. > > I think it is a good idea to expose that. We can use that for optimizations > and we know which operators need to checkpoint state and acknowledge the > asynchronous checkpoint. > > At this point, we need to assume that all operators need to send a > confirmation message, which is unnecessary. > > Also, I think we should expose which operations want a "commit" > notification after the checkpoint completed. Good examples are > > - the KafkaConsumer source, which can then commit the offset that is safe > to zookeeper > > - a transactional KafkaProduce sink, which can commit a batch of messages > to the kafka partition once the checkpoint is done (to get exactly once > guarantees that include the sink) > > Comments welcome! > > Greetings, > Stephan |
Thanks for the comments!
Concerning acknowledging the checkpoint: The sinks need to definitely acknowledge it. If we asynchronously write the state of operator (and emit downstream barriers before that is complete), then I think that we also need those operators to acknowledge the checkpoint. For the commit messages: My first thought was to send commit messages simply as actor messages from the JobManager to the vertices that require these messages. That way, they are not stuck in the data flow with its possible latency. Also, in the data flow, messages get duplicated (at all to all connections). For iterative flows: Does the JobManager need to be aware of this, or can the IterationHead handle that transparently for the JobManager. From our last conversation, I recall: - Receive barriers, push out barriers - snapshot its state - wait for the barriers to come back through the backchannel - write the state snapshot plus the backchannel buffers - then only acknowledge the checkpoint My first impression is that this way the JobManager would not handle the IterationHead any different from all other stateful operators. Greetings, Stephan On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> wrote: > I agree with all suggestions, thanks for summing it up Stephan. > > A few more points I have in mind at the moment: > > - Regarding the acknowledgements, indeed we don’t need to make all > operators commit back, we just have to make sure that all sinks have > acknowledged a checkpoint to consider it complete back at the coordinator. > > - Do you think we should broadcast commit responses to sources that need > it after every successful checkpoint? The checkpoint interval does not > always match with the frequency we want to initiate a compaction for > example on Kafka. One alternative would be to make sources request a > successful checkpoint id via a future on demand. > > - We have to update the current checkpointing approach to cover iterative > streams. We need to make sure we don’t send checkpoint requests to > iteration heads and handle downstream backup for records in transit during > checkpoints accordingly. > > cheers > Paris > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > > > I was looking into the handling of state in streaming operators, and it > is > > a bit hidden from the system > > > > Right now, functions can (of they want) put some state into their > context. > > At runtime, state may occur or not. Before runtime, the system cannot > tell > > which operators are going to be stateful, and which are going to be > > stateless. > > > > I think it is a good idea to expose that. We can use that for > optimizations > > and we know which operators need to checkpoint state and acknowledge the > > asynchronous checkpoint. > > > > At this point, we need to assume that all operators need to send a > > confirmation message, which is unnecessary. > > > > Also, I think we should expose which operations want a "commit" > > notification after the checkpoint completed. Good examples are > > > > - the KafkaConsumer source, which can then commit the offset that is > safe > > to zookeeper > > > > - a transactional KafkaProduce sink, which can commit a batch of > messages > > to the kafka partition once the checkpoint is done (to get exactly once > > guarantees that include the sink) > > > > Comments welcome! > > > > Greetings, > > Stephan > > |
Regarding the commits (for instance kafka offset):
I dont exactly get how you mean to do this, if the source continues processing after the checkpoint and before the commit, it will not know what state has been committed exactly, so it would need to know the time of checkpoint and store a local copy. Gyula On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> wrote: > Thanks for the comments! > > Concerning acknowledging the checkpoint: > > The sinks need to definitely acknowledge it. > If we asynchronously write the state of operator (and emit downstream > barriers before that is complete), > then I think that we also need those operators to acknowledge the > checkpoint. > > > For the commit messages: > > My first thought was to send commit messages simply as actor messages > from the JobManager > to the vertices that require these messages. That way, they are not > stuck in the data flow with its possible latency. > Also, in the data flow, messages get duplicated (at all to all > connections). > > > For iterative flows: > > Does the JobManager need to be aware of this, or can the IterationHead > handle that transparently for the JobManager. > From our last conversation, I recall: > - Receive barriers, push out barriers > - snapshot its state > - wait for the barriers to come back through the backchannel > - write the state snapshot plus the backchannel buffers > - then only acknowledge the checkpoint > > My first impression is that this way the JobManager would not handle the > IterationHead any different from all other stateful operators. > > Greetings, > Stephan > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> wrote: > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > A few more points I have in mind at the moment: > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > operators commit back, we just have to make sure that all sinks have > > acknowledged a checkpoint to consider it complete back at the > coordinator. > > > > - Do you think we should broadcast commit responses to sources that need > > it after every successful checkpoint? The checkpoint interval does not > > always match with the frequency we want to initiate a compaction for > > example on Kafka. One alternative would be to make sources request a > > successful checkpoint id via a future on demand. > > > > - We have to update the current checkpointing approach to cover iterative > > streams. We need to make sure we don’t send checkpoint requests to > > iteration heads and handle downstream backup for records in transit > during > > checkpoints accordingly. > > > > cheers > > Paris > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > > > > > I was looking into the handling of state in streaming operators, and it > > is > > > a bit hidden from the system > > > > > > Right now, functions can (of they want) put some state into their > > context. > > > At runtime, state may occur or not. Before runtime, the system cannot > > tell > > > which operators are going to be stateful, and which are going to be > > > stateless. > > > > > > I think it is a good idea to expose that. We can use that for > > optimizations > > > and we know which operators need to checkpoint state and acknowledge > the > > > asynchronous checkpoint. > > > > > > At this point, we need to assume that all operators need to send a > > > confirmation message, which is unnecessary. > > > > > > Also, I think we should expose which operations want a "commit" > > > notification after the checkpoint completed. Good examples are > > > > > > - the KafkaConsumer source, which can then commit the offset that is > > safe > > > to zookeeper > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > messages > > > to the kafka partition once the checkpoint is done (to get exactly once > > > guarantees that include the sink) > > > > > > Comments welcome! > > > > > > Greetings, > > > Stephan > > > > > |
I think your assumption (and the current kafka source implementation) is
that there is one state object that you update/mutate all the time. If you draw a snapshot state object at the time of checkpoint, the source can continue and that particular offset is remembered as the state of this checkpoint and can be committed to kafka/zookeeper later. On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <[hidden email]> wrote: > Regarding the commits (for instance kafka offset): > > I dont exactly get how you mean to do this, if the source continues > processing after the checkpoint and before the commit, it will not know > what state has been committed exactly, so it would need to know the time of > checkpoint and store a local copy. > > Gyula > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> wrote: > > > Thanks for the comments! > > > > Concerning acknowledging the checkpoint: > > > > The sinks need to definitely acknowledge it. > > If we asynchronously write the state of operator (and emit downstream > > barriers before that is complete), > > then I think that we also need those operators to acknowledge the > > checkpoint. > > > > > > For the commit messages: > > > > My first thought was to send commit messages simply as actor messages > > from the JobManager > > to the vertices that require these messages. That way, they are not > > stuck in the data flow with its possible latency. > > Also, in the data flow, messages get duplicated (at all to all > > connections). > > > > > > For iterative flows: > > > > Does the JobManager need to be aware of this, or can the IterationHead > > handle that transparently for the JobManager. > > From our last conversation, I recall: > > - Receive barriers, push out barriers > > - snapshot its state > > - wait for the barriers to come back through the backchannel > > - write the state snapshot plus the backchannel buffers > > - then only acknowledge the checkpoint > > > > My first impression is that this way the JobManager would not handle the > > IterationHead any different from all other stateful operators. > > > > Greetings, > > Stephan > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> wrote: > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > A few more points I have in mind at the moment: > > > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > > operators commit back, we just have to make sure that all sinks have > > > acknowledged a checkpoint to consider it complete back at the > > coordinator. > > > > > > - Do you think we should broadcast commit responses to sources that > need > > > it after every successful checkpoint? The checkpoint interval does not > > > always match with the frequency we want to initiate a compaction for > > > example on Kafka. One alternative would be to make sources request a > > > successful checkpoint id via a future on demand. > > > > > > - We have to update the current checkpointing approach to cover > iterative > > > streams. We need to make sure we don’t send checkpoint requests to > > > iteration heads and handle downstream backup for records in transit > > during > > > checkpoints accordingly. > > > > > > cheers > > > Paris > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > > > > > > > I was looking into the handling of state in streaming operators, and > it > > > is > > > > a bit hidden from the system > > > > > > > > Right now, functions can (of they want) put some state into their > > > context. > > > > At runtime, state may occur or not. Before runtime, the system cannot > > > tell > > > > which operators are going to be stateful, and which are going to be > > > > stateless. > > > > > > > > I think it is a good idea to expose that. We can use that for > > > optimizations > > > > and we know which operators need to checkpoint state and acknowledge > > the > > > > asynchronous checkpoint. > > > > > > > > At this point, we need to assume that all operators need to send a > > > > confirmation message, which is unnecessary. > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > notification after the checkpoint completed. Good examples are > > > > > > > > - the KafkaConsumer source, which can then commit the offset that is > > > safe > > > > to zookeeper > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > messages > > > > to the kafka partition once the checkpoint is done (to get exactly > once > > > > guarantees that include the sink) > > > > > > > > Comments welcome! > > > > > > > > Greetings, > > > > Stephan > > > > > > > > > |
Okay, so the commit would be something like:
commitState(OperatorState state) On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <[hidden email]> wrote: > I think your assumption (and the current kafka source implementation) is > that there is one state object that you update/mutate all the time. > > If you draw a snapshot state object at the time of checkpoint, the source > can continue and that particular offset is remembered as the state of this > checkpoint > and can be committed to kafka/zookeeper later. > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <[hidden email]> wrote: > > > Regarding the commits (for instance kafka offset): > > > > I dont exactly get how you mean to do this, if the source continues > > processing after the checkpoint and before the commit, it will not know > > what state has been committed exactly, so it would need to know the time > of > > checkpoint and store a local copy. > > > > Gyula > > > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> wrote: > > > > > Thanks for the comments! > > > > > > Concerning acknowledging the checkpoint: > > > > > > The sinks need to definitely acknowledge it. > > > If we asynchronously write the state of operator (and emit > downstream > > > barriers before that is complete), > > > then I think that we also need those operators to acknowledge the > > > checkpoint. > > > > > > > > > For the commit messages: > > > > > > My first thought was to send commit messages simply as actor > messages > > > from the JobManager > > > to the vertices that require these messages. That way, they are not > > > stuck in the data flow with its possible latency. > > > Also, in the data flow, messages get duplicated (at all to all > > > connections). > > > > > > > > > For iterative flows: > > > > > > Does the JobManager need to be aware of this, or can the IterationHead > > > handle that transparently for the JobManager. > > > From our last conversation, I recall: > > > - Receive barriers, push out barriers > > > - snapshot its state > > > - wait for the barriers to come back through the backchannel > > > - write the state snapshot plus the backchannel buffers > > > - then only acknowledge the checkpoint > > > > > > My first impression is that this way the JobManager would not handle > the > > > IterationHead any different from all other stateful operators. > > > > > > Greetings, > > > Stephan > > > > > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> wrote: > > > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > > > A few more points I have in mind at the moment: > > > > > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > > > operators commit back, we just have to make sure that all sinks have > > > > acknowledged a checkpoint to consider it complete back at the > > > coordinator. > > > > > > > > - Do you think we should broadcast commit responses to sources that > > need > > > > it after every successful checkpoint? The checkpoint interval does > not > > > > always match with the frequency we want to initiate a compaction for > > > > example on Kafka. One alternative would be to make sources request a > > > > successful checkpoint id via a future on demand. > > > > > > > > - We have to update the current checkpointing approach to cover > > iterative > > > > streams. We need to make sure we don’t send checkpoint requests to > > > > iteration heads and handle downstream backup for records in transit > > > during > > > > checkpoints accordingly. > > > > > > > > cheers > > > > Paris > > > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > > > > > > > > > I was looking into the handling of state in streaming operators, > and > > it > > > > is > > > > > a bit hidden from the system > > > > > > > > > > Right now, functions can (of they want) put some state into their > > > > context. > > > > > At runtime, state may occur or not. Before runtime, the system > cannot > > > > tell > > > > > which operators are going to be stateful, and which are going to be > > > > > stateless. > > > > > > > > > > I think it is a good idea to expose that. We can use that for > > > > optimizations > > > > > and we know which operators need to checkpoint state and > acknowledge > > > the > > > > > asynchronous checkpoint. > > > > > > > > > > At this point, we need to assume that all operators need to send a > > > > > confirmation message, which is unnecessary. > > > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > > notification after the checkpoint completed. Good examples are > > > > > > > > > > - the KafkaConsumer source, which can then commit the offset that > is > > > > safe > > > > > to zookeeper > > > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > > messages > > > > > to the kafka partition once the checkpoint is done (to get exactly > > once > > > > > guarantees that include the sink) > > > > > > > > > > Comments welcome! > > > > > > > > > > Greetings, > > > > > Stephan > > > > > > > > > > > > > > |
That would be one way of doing it, yes...
On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <[hidden email]> wrote: > Okay, so the commit would be something like: > > commitState(OperatorState state) > > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <[hidden email]> wrote: > > > I think your assumption (and the current kafka source implementation) is > > that there is one state object that you update/mutate all the time. > > > > If you draw a snapshot state object at the time of checkpoint, the source > > can continue and that particular offset is remembered as the state of > this > > checkpoint > > and can be committed to kafka/zookeeper later. > > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <[hidden email]> > wrote: > > > > > Regarding the commits (for instance kafka offset): > > > > > > I dont exactly get how you mean to do this, if the source continues > > > processing after the checkpoint and before the commit, it will not know > > > what state has been committed exactly, so it would need to know the > time > > of > > > checkpoint and store a local copy. > > > > > > Gyula > > > > > > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> > wrote: > > > > > > > Thanks for the comments! > > > > > > > > Concerning acknowledging the checkpoint: > > > > > > > > The sinks need to definitely acknowledge it. > > > > If we asynchronously write the state of operator (and emit > > downstream > > > > barriers before that is complete), > > > > then I think that we also need those operators to acknowledge the > > > > checkpoint. > > > > > > > > > > > > For the commit messages: > > > > > > > > My first thought was to send commit messages simply as actor > > messages > > > > from the JobManager > > > > to the vertices that require these messages. That way, they are > not > > > > stuck in the data flow with its possible latency. > > > > Also, in the data flow, messages get duplicated (at all to all > > > > connections). > > > > > > > > > > > > For iterative flows: > > > > > > > > Does the JobManager need to be aware of this, or can the > IterationHead > > > > handle that transparently for the JobManager. > > > > From our last conversation, I recall: > > > > - Receive barriers, push out barriers > > > > - snapshot its state > > > > - wait for the barriers to come back through the backchannel > > > > - write the state snapshot plus the backchannel buffers > > > > - then only acknowledge the checkpoint > > > > > > > > My first impression is that this way the JobManager would not handle > > the > > > > IterationHead any different from all other stateful operators. > > > > > > > > Greetings, > > > > Stephan > > > > > > > > > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> > wrote: > > > > > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > > > > > A few more points I have in mind at the moment: > > > > > > > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > > > > operators commit back, we just have to make sure that all sinks > have > > > > > acknowledged a checkpoint to consider it complete back at the > > > > coordinator. > > > > > > > > > > - Do you think we should broadcast commit responses to sources that > > > need > > > > > it after every successful checkpoint? The checkpoint interval does > > not > > > > > always match with the frequency we want to initiate a compaction > for > > > > > example on Kafka. One alternative would be to make sources request > a > > > > > successful checkpoint id via a future on demand. > > > > > > > > > > - We have to update the current checkpointing approach to cover > > > iterative > > > > > streams. We need to make sure we don’t send checkpoint requests to > > > > > iteration heads and handle downstream backup for records in transit > > > > during > > > > > checkpoints accordingly. > > > > > > > > > > cheers > > > > > Paris > > > > > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> wrote: > > > > > > > > > > > > I was looking into the handling of state in streaming operators, > > and > > > it > > > > > is > > > > > > a bit hidden from the system > > > > > > > > > > > > Right now, functions can (of they want) put some state into their > > > > > context. > > > > > > At runtime, state may occur or not. Before runtime, the system > > cannot > > > > > tell > > > > > > which operators are going to be stateful, and which are going to > be > > > > > > stateless. > > > > > > > > > > > > I think it is a good idea to expose that. We can use that for > > > > > optimizations > > > > > > and we know which operators need to checkpoint state and > > acknowledge > > > > the > > > > > > asynchronous checkpoint. > > > > > > > > > > > > At this point, we need to assume that all operators need to send > a > > > > > > confirmation message, which is unnecessary. > > > > > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > > > notification after the checkpoint completed. Good examples are > > > > > > > > > > > > - the KafkaConsumer source, which can then commit the offset > that > > is > > > > > safe > > > > > > to zookeeper > > > > > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > > > messages > > > > > > to the kafka partition once the checkpoint is done (to get > exactly > > > once > > > > > > guarantees that include the sink) > > > > > > > > > > > > Comments welcome! > > > > > > > > > > > > Greetings, > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > |
From this discussion I derive that we will have a state abstraction that
everyone who requires state will work with? Or will the state be in object fields and they will be saved upon invocation of some doBackup() method. On Apr 30, 2015 10:31 PM, "Stephan Ewen" <[hidden email]> wrote: > That would be one way of doing it, yes... > > On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <[hidden email]> wrote: > > > Okay, so the commit would be something like: > > > > commitState(OperatorState state) > > > > > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <[hidden email]> wrote: > > > > > I think your assumption (and the current kafka source implementation) > is > > > that there is one state object that you update/mutate all the time. > > > > > > If you draw a snapshot state object at the time of checkpoint, the > source > > > can continue and that particular offset is remembered as the state of > > this > > > checkpoint > > > and can be committed to kafka/zookeeper later. > > > > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <[hidden email]> > > wrote: > > > > > > > Regarding the commits (for instance kafka offset): > > > > > > > > I dont exactly get how you mean to do this, if the source continues > > > > processing after the checkpoint and before the commit, it will not > know > > > > what state has been committed exactly, so it would need to know the > > time > > > of > > > > checkpoint and store a local copy. > > > > > > > > Gyula > > > > > > > > > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> > > wrote: > > > > > > > > > Thanks for the comments! > > > > > > > > > > Concerning acknowledging the checkpoint: > > > > > > > > > > The sinks need to definitely acknowledge it. > > > > > If we asynchronously write the state of operator (and emit > > > downstream > > > > > barriers before that is complete), > > > > > then I think that we also need those operators to acknowledge > the > > > > > checkpoint. > > > > > > > > > > > > > > > For the commit messages: > > > > > > > > > > My first thought was to send commit messages simply as actor > > > messages > > > > > from the JobManager > > > > > to the vertices that require these messages. That way, they are > > not > > > > > stuck in the data flow with its possible latency. > > > > > Also, in the data flow, messages get duplicated (at all to all > > > > > connections). > > > > > > > > > > > > > > > For iterative flows: > > > > > > > > > > Does the JobManager need to be aware of this, or can the > > IterationHead > > > > > handle that transparently for the JobManager. > > > > > From our last conversation, I recall: > > > > > - Receive barriers, push out barriers > > > > > - snapshot its state > > > > > - wait for the barriers to come back through the backchannel > > > > > - write the state snapshot plus the backchannel buffers > > > > > - then only acknowledge the checkpoint > > > > > > > > > > My first impression is that this way the JobManager would not > handle > > > the > > > > > IterationHead any different from all other stateful operators. > > > > > > > > > > Greetings, > > > > > Stephan > > > > > > > > > > > > > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> > > wrote: > > > > > > > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > > > > > > > A few more points I have in mind at the moment: > > > > > > > > > > > > - Regarding the acknowledgements, indeed we don’t need to make > all > > > > > > operators commit back, we just have to make sure that all sinks > > have > > > > > > acknowledged a checkpoint to consider it complete back at the > > > > > coordinator. > > > > > > > > > > > > - Do you think we should broadcast commit responses to sources > that > > > > need > > > > > > it after every successful checkpoint? The checkpoint interval > does > > > not > > > > > > always match with the frequency we want to initiate a compaction > > for > > > > > > example on Kafka. One alternative would be to make sources > request > > a > > > > > > successful checkpoint id via a future on demand. > > > > > > > > > > > > - We have to update the current checkpointing approach to cover > > > > iterative > > > > > > streams. We need to make sure we don’t send checkpoint requests > to > > > > > > iteration heads and handle downstream backup for records in > transit > > > > > during > > > > > > checkpoints accordingly. > > > > > > > > > > > > cheers > > > > > > Paris > > > > > > > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> > wrote: > > > > > > > > > > > > > > I was looking into the handling of state in streaming > operators, > > > and > > > > it > > > > > > is > > > > > > > a bit hidden from the system > > > > > > > > > > > > > > Right now, functions can (of they want) put some state into > their > > > > > > context. > > > > > > > At runtime, state may occur or not. Before runtime, the system > > > cannot > > > > > > tell > > > > > > > which operators are going to be stateful, and which are going > to > > be > > > > > > > stateless. > > > > > > > > > > > > > > I think it is a good idea to expose that. We can use that for > > > > > > optimizations > > > > > > > and we know which operators need to checkpoint state and > > > acknowledge > > > > > the > > > > > > > asynchronous checkpoint. > > > > > > > > > > > > > > At this point, we need to assume that all operators need to > send > > a > > > > > > > confirmation message, which is unnecessary. > > > > > > > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > > > > notification after the checkpoint completed. Good examples are > > > > > > > > > > > > > > - the KafkaConsumer source, which can then commit the offset > > that > > > is > > > > > > safe > > > > > > > to zookeeper > > > > > > > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch > of > > > > > > messages > > > > > > > to the kafka partition once the checkpoint is done (to get > > exactly > > > > once > > > > > > > guarantees that include the sink) > > > > > > > > > > > > > > Comments welcome! > > > > > > > > > > > > > > Greetings, > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > |
The current aim is the first option as you have correctly derived. :)
On May 1, 2015 5:39 PM, "Aljoscha Krettek" <[hidden email]> wrote: > From this discussion I derive that we will have a state abstraction that > everyone who requires state will work with? Or will the state be in object > fields and they will be saved upon invocation of some doBackup() method. > On Apr 30, 2015 10:31 PM, "Stephan Ewen" <[hidden email]> wrote: > > > That would be one way of doing it, yes... > > > > On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <[hidden email]> > wrote: > > > > > Okay, so the commit would be something like: > > > > > > commitState(OperatorState state) > > > > > > > > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <[hidden email]> > wrote: > > > > > > > I think your assumption (and the current kafka source implementation) > > is > > > > that there is one state object that you update/mutate all the time. > > > > > > > > If you draw a snapshot state object at the time of checkpoint, the > > source > > > > can continue and that particular offset is remembered as the state of > > > this > > > > checkpoint > > > > and can be committed to kafka/zookeeper later. > > > > > > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > Regarding the commits (for instance kafka offset): > > > > > > > > > > I dont exactly get how you mean to do this, if the source continues > > > > > processing after the checkpoint and before the commit, it will not > > know > > > > > what state has been committed exactly, so it would need to know the > > > time > > > > of > > > > > checkpoint and store a local copy. > > > > > > > > > > Gyula > > > > > > > > > > > > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <[hidden email]> > > > wrote: > > > > > > > > > > > Thanks for the comments! > > > > > > > > > > > > Concerning acknowledging the checkpoint: > > > > > > > > > > > > The sinks need to definitely acknowledge it. > > > > > > If we asynchronously write the state of operator (and emit > > > > downstream > > > > > > barriers before that is complete), > > > > > > then I think that we also need those operators to acknowledge > > the > > > > > > checkpoint. > > > > > > > > > > > > > > > > > > For the commit messages: > > > > > > > > > > > > My first thought was to send commit messages simply as actor > > > > messages > > > > > > from the JobManager > > > > > > to the vertices that require these messages. That way, they > are > > > not > > > > > > stuck in the data flow with its possible latency. > > > > > > Also, in the data flow, messages get duplicated (at all to all > > > > > > connections). > > > > > > > > > > > > > > > > > > For iterative flows: > > > > > > > > > > > > Does the JobManager need to be aware of this, or can the > > > IterationHead > > > > > > handle that transparently for the JobManager. > > > > > > From our last conversation, I recall: > > > > > > - Receive barriers, push out barriers > > > > > > - snapshot its state > > > > > > - wait for the barriers to come back through the backchannel > > > > > > - write the state snapshot plus the backchannel buffers > > > > > > - then only acknowledge the checkpoint > > > > > > > > > > > > My first impression is that this way the JobManager would not > > handle > > > > the > > > > > > IterationHead any different from all other stateful operators. > > > > > > > > > > > > Greetings, > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <[hidden email]> > > > wrote: > > > > > > > > > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > > > > > > > > > A few more points I have in mind at the moment: > > > > > > > > > > > > > > - Regarding the acknowledgements, indeed we don’t need to make > > all > > > > > > > operators commit back, we just have to make sure that all sinks > > > have > > > > > > > acknowledged a checkpoint to consider it complete back at the > > > > > > coordinator. > > > > > > > > > > > > > > - Do you think we should broadcast commit responses to sources > > that > > > > > need > > > > > > > it after every successful checkpoint? The checkpoint interval > > does > > > > not > > > > > > > always match with the frequency we want to initiate a > compaction > > > for > > > > > > > example on Kafka. One alternative would be to make sources > > request > > > a > > > > > > > successful checkpoint id via a future on demand. > > > > > > > > > > > > > > - We have to update the current checkpointing approach to cover > > > > > iterative > > > > > > > streams. We need to make sure we don’t send checkpoint requests > > to > > > > > > > iteration heads and handle downstream backup for records in > > transit > > > > > > during > > > > > > > checkpoints accordingly. > > > > > > > > > > > > > > cheers > > > > > > > Paris > > > > > > > > > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <[hidden email]> > > wrote: > > > > > > > > > > > > > > > > I was looking into the handling of state in streaming > > operators, > > > > and > > > > > it > > > > > > > is > > > > > > > > a bit hidden from the system > > > > > > > > > > > > > > > > Right now, functions can (of they want) put some state into > > their > > > > > > > context. > > > > > > > > At runtime, state may occur or not. Before runtime, the > system > > > > cannot > > > > > > > tell > > > > > > > > which operators are going to be stateful, and which are going > > to > > > be > > > > > > > > stateless. > > > > > > > > > > > > > > > > I think it is a good idea to expose that. We can use that for > > > > > > > optimizations > > > > > > > > and we know which operators need to checkpoint state and > > > > acknowledge > > > > > > the > > > > > > > > asynchronous checkpoint. > > > > > > > > > > > > > > > > At this point, we need to assume that all operators need to > > send > > > a > > > > > > > > confirmation message, which is unnecessary. > > > > > > > > > > > > > > > > Also, I think we should expose which operations want a > "commit" > > > > > > > > notification after the checkpoint completed. Good examples > are > > > > > > > > > > > > > > > > - the KafkaConsumer source, which can then commit the offset > > > that > > > > is > > > > > > > safe > > > > > > > > to zookeeper > > > > > > > > > > > > > > > > - a transactional KafkaProduce sink, which can commit a > batch > > of > > > > > > > messages > > > > > > > > to the kafka partition once the checkpoint is done (to get > > > exactly > > > > > once > > > > > > > > guarantees that include the sink) > > > > > > > > > > > > > > > > Comments welcome! > > > > > > > > > > > > > > > > Greetings, > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |