[jira] [Created] (FLINK-19557) Issue retrieving leader after zookeeper session reconnect

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19557) Issue retrieving leader after zookeeper session reconnect

Shang Yuanchun (Jira)
Max Mizikar created FLINK-19557:
-----------------------------------

             Summary: Issue retrieving leader after zookeeper session reconnect
                 Key: FLINK-19557
                 URL: https://issues.apache.org/jira/browse/FLINK-19557
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.11.2
            Reporter: Max Mizikar


We have noticed an issue with leaders being retrieved after reconnecting to zookeeper. The steps to reproduce this issue are to break the connection between a job manager that is not the leader and zookeeper. Wait for the session to be lost between the two. At this point, flink notifies for a loss of leader. After the loss of leader has occured, reconnect the job manager to zookeeper. At this point, the leader will still be the same as it was before, but when trying to access the rest API, you will see this
```
$ curl -s localhost:8999/jobs
{"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]}
```
I have been using `stress -t 60 -m 2048` (which spins up 2048 threads continuously alloc and freeing 256MB, to swap out the job manager and cause the connection loss.

I have done some amount of digging on this. The ZooKeeperLeaderRetrievalService has this code block for handling state changes
```
        protected void handleStateChange(ConnectionState newState) {
                switch (newState) {
                        case CONNECTED:
                                LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
                                break;
                        case SUSPENDED:
                                LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " +
                                                "ZooKeeper.");
                                synchronized (lock) {
                                        notifyLeaderLoss();
                                }
                                break;
                        case RECONNECTED:
                                LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
                                break;
                        case LOST:
                                LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
                                                "ZooKeeper.");
                                synchronized (lock) {
                                        notifyLeaderLoss();
                                }
                                break;
                }
        }
```
It calls notifyLeaderLoss() when the connection is lost, but it doesn't do anything when the connection is reconnected. It appears that curator's NodeCache will retrieve the value of the leader znode after reconnect, but it won't notify the listeners if the value is the same as before the connection loss. So, unless a leader election happens after a zookeeper connection loss, the job managers that are not the leader will never know that there is a leader.

The method that is called for NodeCache when a new value is retrieved
```
    private void setNewData(ChildData newData) throws InterruptedException
    {
        ChildData   previousData = data.getAndSet(newData);
        if ( !Objects.equal(previousData, newData) )
        {
            listeners.forEach(listener -> {
                try
                {
                    listener.nodeChanged();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Calling listener", e);
                }
            });

            if ( rebuildTestExchanger != null )
            {
                try
                {
                    rebuildTestExchanger.exchange(new Object());
                }
                catch ( InterruptedException e )
                {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
```
note the
```
        if ( !Objects.equal(previousData, newData) )
```
seems to be preventing the job managers from getting the leader after a zookeeper connection loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)