Flink v1.7.1
After a Flink reboot we've been seeing some unexpected issues with excess retained checkpoints not being able to be removed from ZooKeeper after a new checkpoint is created. I believe I've got my head around the role of ZK and lockNodes in Checkpointing after going through the code. Could you check my logic on this and add any insight, especially if I've got it wrong? The situation: 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA with S3 as the backing store. 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has its own lockNode UUID. JM1 is elected leader. 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's JobGraph lockNode. 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's checkpoint lockNode. We continue running, and checkpoints are successfully being created and excess checkpoints removed. 5) Both JM1 and JM2 now are rebooted. 6) The JobGraph is recovered by the leader, the job restarts from the latest checkpoint. Now after every new checkpoint we see in the ZooKeeper logs: INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10000047715000d type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 Error:KeeperErrorCode = Directory not empty for /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781 with an increasing checkpoint id on each subsequent call. When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, right? As the old checkpoints were created under the old UUID, the new JMs will never be able to remove the old retained checkpoints from ZooKeeper. Is that correct? If so, would this also happen with JobGraphs in the following situation (we saw this just recently where we had a JobGraph for a cancelled job still in ZK): Steps 1 through 3 above, then: 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 restarts. 5) some time later while JM2 is still leader we hard cancel the job and restart the JMs In this case JM2 would successfully remove the job from s3, but because its lockNode is different from JM1 it cannot delete the lock file in the jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and tries to process the JobGraph it has found, but the S3 files have been deleted. Possible related closed issues (fixes went in v1.7.0): https://issues.apache.org/jira/browse/FLINK-10184 and https://issues.apache.org/jira/browse/FLINK-10255 Thanks for any insight, Dyana |
Hi Dyana,
your analysis is almost correct. The only part which is missing is that the lock nodes are created as ephemeral nodes. This should ensure that if a JM process dies that the lock nodes will get removed by ZooKeeper. It depends a bit on ZooKeeper's configuration how long it takes until Zk detects a client connection as lost and then removes the ephemeral nodes. If the job should terminate within this time interval, then it could happen that you cannot remove the checkpoint/JobGraph. However, usually the Zookeeper session timeout should be configured to be a couple of seconds. I would actually be interested in better understanding your problem to see whether this is still a bug in Flink. Could you maybe share the respective logs on DEBUG log level with me? Maybe it would also be possible to run the latest version of Flink (1.7.2) to include all possible bug fixes. FYI: The community is currently discussing to reimplement the ZooKeeper based high availability services [1]. One idea is to get rid of the lock nodes by replacing them with transactions on the leader node. This could prevent these kind of bugs in the future. [1] https://issues.apache.org/jira/browse/FLINK-10333 Cheers, Till On Thu, Apr 18, 2019 at 3:12 PM dyana.rose <[hidden email]> wrote: > Flink v1.7.1 > > After a Flink reboot we've been seeing some unexpected issues with excess > retained checkpoints not being able to be removed from ZooKeeper after a > new checkpoint is created. > > I believe I've got my head around the role of ZK and lockNodes in > Checkpointing after going through the code. Could you check my logic on > this and add any insight, especially if I've got it wrong? > > The situation: > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA > with S3 as the backing store. > > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has > its own lockNode UUID. JM1 is elected leader. > > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's > JobGraph lockNode. > > 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's > checkpoint lockNode. We continue running, and checkpoints are successfully > being created and excess checkpoints removed. > > 5) Both JM1 and JM2 now are rebooted. > > 6) The JobGraph is recovered by the leader, the job restarts from the > latest checkpoint. > > Now after every new checkpoint we see in the ZooKeeper logs: > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got > user-level KeeperException when processing sessionid:0x10000047715000d > type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error > Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 > Error:KeeperErrorCode = Directory not empty for > /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781 > with an increasing checkpoint id on each subsequent call. > > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, > right? As the old checkpoints were created under the old UUID, the new JMs > will never be able to remove the old retained checkpoints from ZooKeeper. > > Is that correct? > > If so, would this also happen with JobGraphs in the following situation > (we saw this just recently where we had a JobGraph for a cancelled job > still in ZK): > > Steps 1 through 3 above, then: > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 > restarts. > > 5) some time later while JM2 is still leader we hard cancel the job and > restart the JMs > > In this case JM2 would successfully remove the job from s3, but because > its lockNode is different from JM1 it cannot delete the lock file in the > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and > tries to process the JobGraph it has found, but the S3 files have been > deleted. > > Possible related closed issues (fixes went in v1.7.0): > https://issues.apache.org/jira/browse/FLINK-10184 and > https://issues.apache.org/jira/browse/FLINK-10255 > > Thanks for any insight, > Dyana > |
may take me a bit to get the logs as we're not always in a situation where
we've got enough hands free to run through the scenarios for a day. Is that DEBUG JobManager, DEBUG ZooKeeper, or both you'd be interested in? Thanks, Dyana On Tue, 23 Apr 2019 at 13:23, Till Rohrmann <[hidden email]> wrote: > Hi Dyana, > > your analysis is almost correct. The only part which is missing is that the > lock nodes are created as ephemeral nodes. This should ensure that if a JM > process dies that the lock nodes will get removed by ZooKeeper. It depends > a bit on ZooKeeper's configuration how long it takes until Zk detects a > client connection as lost and then removes the ephemeral nodes. If the job > should terminate within this time interval, then it could happen that you > cannot remove the checkpoint/JobGraph. However, usually the Zookeeper > session timeout should be configured to be a couple of seconds. > > I would actually be interested in better understanding your problem to see > whether this is still a bug in Flink. Could you maybe share the respective > logs on DEBUG log level with me? Maybe it would also be possible to run the > latest version of Flink (1.7.2) to include all possible bug fixes. > > FYI: The community is currently discussing to reimplement the ZooKeeper > based high availability services [1]. One idea is to get rid of the lock > nodes by replacing them with transactions on the leader node. This could > prevent these kind of bugs in the future. > > [1] https://issues.apache.org/jira/browse/FLINK-10333 > > Cheers, > Till > > On Thu, Apr 18, 2019 at 3:12 PM dyana.rose <[hidden email]> > wrote: > > > Flink v1.7.1 > > > > After a Flink reboot we've been seeing some unexpected issues with excess > > retained checkpoints not being able to be removed from ZooKeeper after a > > new checkpoint is created. > > > > I believe I've got my head around the role of ZK and lockNodes in > > Checkpointing after going through the code. Could you check my logic on > > this and add any insight, especially if I've got it wrong? > > > > The situation: > > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA > > with S3 as the backing store. > > > > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore > has > > its own lockNode UUID. JM1 is elected leader. > > > > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's > > JobGraph lockNode. > > > > 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's > > checkpoint lockNode. We continue running, and checkpoints are > successfully > > being created and excess checkpoints removed. > > > > 5) Both JM1 and JM2 now are rebooted. > > > > 6) The JobGraph is recovered by the leader, the job restarts from the > > latest checkpoint. > > > > Now after every new checkpoint we see in the ZooKeeper logs: > > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got > > user-level KeeperException when processing sessionid:0x10000047715000d > > type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error > > > Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 > > Error:KeeperErrorCode = Directory not empty for > > > /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781 > > with an increasing checkpoint id on each subsequent call. > > > > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, > > right? As the old checkpoints were created under the old UUID, the new > JMs > > will never be able to remove the old retained checkpoints from ZooKeeper. > > > > Is that correct? > > > > If so, would this also happen with JobGraphs in the following situation > > (we saw this just recently where we had a JobGraph for a cancelled job > > still in ZK): > > > > Steps 1 through 3 above, then: > > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 > > restarts. > > > > 5) some time later while JM2 is still leader we hard cancel the job and > > restart the JMs > > > > In this case JM2 would successfully remove the job from s3, but because > > its lockNode is different from JM1 it cannot delete the lock file in the > > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and > > tries to process the JobGraph it has found, but the S3 files have been > > deleted. > > > > Possible related closed issues (fixes went in v1.7.0): > > https://issues.apache.org/jira/browse/FLINK-10184 and > > https://issues.apache.org/jira/browse/FLINK-10255 > > > > Thanks for any insight, > > Dyana > > > -- Dyana Rose Software Engineer W: www.salecycle.com <http://www.salecycle.com/> [image: The 2019 Look Book - Download Now] <https://t.xink.io/Tracking/Index/WcwBAKNtAAAwphkA0> |
It would be awesome to get the DEBUG logs for JobMaster,
ZooKeeper, ZooKeeperCompletedCheckpointStore, ZooKeeperStateHandleStore, CheckpointCoordinator. Cheers, Till On Tue, Apr 23, 2019 at 2:37 PM Dyana Rose <[hidden email]> wrote: > may take me a bit to get the logs as we're not always in a situation where > we've got enough hands free to run through the scenarios for a day. > > Is that DEBUG JobManager, DEBUG ZooKeeper, or both you'd be interested in? > > Thanks, > Dyana > > On Tue, 23 Apr 2019 at 13:23, Till Rohrmann <[hidden email]> wrote: > > > Hi Dyana, > > > > your analysis is almost correct. The only part which is missing is that > the > > lock nodes are created as ephemeral nodes. This should ensure that if a > JM > > process dies that the lock nodes will get removed by ZooKeeper. It > depends > > a bit on ZooKeeper's configuration how long it takes until Zk detects a > > client connection as lost and then removes the ephemeral nodes. If the > job > > should terminate within this time interval, then it could happen that you > > cannot remove the checkpoint/JobGraph. However, usually the Zookeeper > > session timeout should be configured to be a couple of seconds. > > > > I would actually be interested in better understanding your problem to > see > > whether this is still a bug in Flink. Could you maybe share the > respective > > logs on DEBUG log level with me? Maybe it would also be possible to run > the > > latest version of Flink (1.7.2) to include all possible bug fixes. > > > > FYI: The community is currently discussing to reimplement the ZooKeeper > > based high availability services [1]. One idea is to get rid of the lock > > nodes by replacing them with transactions on the leader node. This could > > prevent these kind of bugs in the future. > > > > [1] https://issues.apache.org/jira/browse/FLINK-10333 > > > > Cheers, > > Till > > > > On Thu, Apr 18, 2019 at 3:12 PM dyana.rose <[hidden email]> > > wrote: > > > > > Flink v1.7.1 > > > > > > After a Flink reboot we've been seeing some unexpected issues with > excess > > > retained checkpoints not being able to be removed from ZooKeeper after > a > > > new checkpoint is created. > > > > > > I believe I've got my head around the role of ZK and lockNodes in > > > Checkpointing after going through the code. Could you check my logic on > > > this and add any insight, especially if I've got it wrong? > > > > > > The situation: > > > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in > HA > > > with S3 as the backing store. > > > > > > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore > > has > > > its own lockNode UUID. JM1 is elected leader. > > > > > > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's > > > JobGraph lockNode. > > > > > > 4) Checkpoints start rolling in, latest 10 are retained in ZK using > JM1's > > > checkpoint lockNode. We continue running, and checkpoints are > > successfully > > > being created and excess checkpoints removed. > > > > > > 5) Both JM1 and JM2 now are rebooted. > > > > > > 6) The JobGraph is recovered by the leader, the job restarts from the > > > latest checkpoint. > > > > > > Now after every new checkpoint we see in the ZooKeeper logs: > > > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got > > > user-level KeeperException when processing sessionid:0x10000047715000d > > > type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error > > > > > > Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 > > > Error:KeeperErrorCode = Directory not empty for > > > > > > /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781 > > > with an increasing checkpoint id on each subsequent call. > > > > > > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, > > > right? As the old checkpoints were created under the old UUID, the new > > JMs > > > will never be able to remove the old retained checkpoints from > ZooKeeper. > > > > > > Is that correct? > > > > > > If so, would this also happen with JobGraphs in the following situation > > > (we saw this just recently where we had a JobGraph for a cancelled job > > > still in ZK): > > > > > > Steps 1 through 3 above, then: > > > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 > > > restarts. > > > > > > 5) some time later while JM2 is still leader we hard cancel the job and > > > restart the JMs > > > > > > In this case JM2 would successfully remove the job from s3, but because > > > its lockNode is different from JM1 it cannot delete the lock file in > the > > > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts > and > > > tries to process the JobGraph it has found, but the S3 files have been > > > deleted. > > > > > > Possible related closed issues (fixes went in v1.7.0): > > > https://issues.apache.org/jira/browse/FLINK-10184 and > > > https://issues.apache.org/jira/browse/FLINK-10255 > > > > > > Thanks for any insight, > > > Dyana > > > > > > > > -- > > Dyana Rose > Software Engineer > > > W: www.salecycle.com <http://www.salecycle.com/> > [image: The 2019 Look Book - Download Now] > <https://t.xink.io/Tracking/Index/WcwBAKNtAAAwphkA0> > |
Like all the best problems, I can't get this to reproduce locally.
Everything has worked as expected. I started up a test job with 5 retained checkpoints, let it run and watched the nodes in zookeeper. Then shut down and restarted the Flink cluster. The ephemeral lock nodes in the retained checkpoints transitioned from one lock id to another without a hitch. So that's good. As I understand it, if the Zookeeper cluster is having a sync issue, ephemeral nodes may not get deleted when the session becomes inactive. We're new to running our own zookeeper so it may be down to that. |
Thanks for the update Dyana. I'm also not an expert in running one's own
ZooKeeper cluster. It might be related to setting the ZooKeeper cluster properly up. Maybe someone else from the community has experience with this. Therefore, I'm cross posting this thread to the user ML again to have a wider reach. Cheers, Till On Wed, May 1, 2019 at 10:17 AM dyana.rose <[hidden email]> wrote: > Like all the best problems, I can't get this to reproduce locally. > > Everything has worked as expected. I started up a test job with 5 retained > checkpoints, let it run and watched the nodes in zookeeper. > > Then shut down and restarted the Flink cluster. > > The ephemeral lock nodes in the retained checkpoints transitioned from one > lock id to another without a hitch. > > So that's good. > > As I understand it, if the Zookeeper cluster is having a sync issue, > ephemeral nodes may not get deleted when the session becomes inactive. > We're new to running our own zookeeper so it may be down to that. > |
Just wanted to give an update on this.
Our ops team and myself independently came to the same conclusion that our ZooKeeper quorum was having syncing issues. After a bit more research, they have updated the initLimit and syncLimit in the quorum configs to: initLimit=10 syncLimit=5 After this change we no longer saw any of the issues we were having. Thanks, Dyana On 2019/05/02 08:43:14, Till Rohrmann <[hidden email]> wrote: > Thanks for the update Dyana. I'm also not an expert in running one's own > ZooKeeper cluster. It might be related to setting the ZooKeeper cluster > properly up. Maybe someone else from the community has experience with > this. Therefore, I'm cross posting this thread to the user ML again to have > a wider reach. > > Cheers, > Till > > On Wed, May 1, 2019 at 10:17 AM dyana.rose <[hidden email]> wrote: > > > Like all the best problems, I can't get this to reproduce locally. > > > > Everything has worked as expected. I started up a test job with 5 retained > > checkpoints, let it run and watched the nodes in zookeeper. > > > > Then shut down and restarted the Flink cluster. > > > > The ephemeral lock nodes in the retained checkpoints transitioned from one > > lock id to another without a hitch. > > > > So that's good. > > > > As I understand it, if the Zookeeper cluster is having a sync issue, > > ephemeral nodes may not get deleted when the session becomes inactive. > > We're new to running our own zookeeper so it may be down to that. > > > |
Great to hear Dyana. Thanks for the update.
Cheers, Till On Fri, Jun 7, 2019 at 2:48 PM dyana.rose <[hidden email]> wrote: > Just wanted to give an update on this. > > Our ops team and myself independently came to the same conclusion that our > ZooKeeper quorum was having syncing issues. > > After a bit more research, they have updated the initLimit and syncLimit > in the quorum configs to: > initLimit=10 > syncLimit=5 > > After this change we no longer saw any of the issues we were having. > > Thanks, > Dyana > > On 2019/05/02 08:43:14, Till Rohrmann <[hidden email]> wrote: > > Thanks for the update Dyana. I'm also not an expert in running one's own > > ZooKeeper cluster. It might be related to setting the ZooKeeper cluster > > properly up. Maybe someone else from the community has experience with > > this. Therefore, I'm cross posting this thread to the user ML again to > have > > a wider reach. > > > > Cheers, > > Till > > > > On Wed, May 1, 2019 at 10:17 AM dyana.rose <[hidden email]> > wrote: > > > > > Like all the best problems, I can't get this to reproduce locally. > > > > > > Everything has worked as expected. I started up a test job with 5 > retained > > > checkpoints, let it run and watched the nodes in zookeeper. > > > > > > Then shut down and restarted the Flink cluster. > > > > > > The ephemeral lock nodes in the retained checkpoints transitioned from > one > > > lock id to another without a hitch. > > > > > > So that's good. > > > > > > As I understand it, if the Zookeeper cluster is having a sync issue, > > > ephemeral nodes may not get deleted when the session becomes inactive. > > > We're new to running our own zookeeper so it may be down to that. > > > > > > |
Free forum by Nabble | Edit this page |