/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.LeaderElectionBean;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FastLeaderElection
implements Election {
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
    static final int finalizeWait = 200;
    static final int maxNotificationInterval = 60000;
    static final int IGNOREVALUE = -1;
    QuorumCnxManager manager;
    static byte[] dummyData = new byte[0];
    LinkedBlockingQueue<ToSend> sendqueue;
    LinkedBlockingQueue<Notification> recvqueue;
    QuorumPeer self;
    Messenger messenger;
    AtomicLong logicalclock = new AtomicLong();
    long proposedLeader;
    long proposedZxid;
    long proposedEpoch;
    volatile boolean stop = false;

    public long getLogicalClock() {
        return this.logicalclock.get();
    }

    static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) {
        byte[] requestBytes = new byte[40];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(1);
        return requestBuffer;
    }

    static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData) {
        byte[] requestBytes = new byte[44 + configData.length];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(2);
        requestBuffer.putInt(configData.length);
        requestBuffer.put(configData);
        return requestBuffer;
    }

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
        this.manager = manager;
        this.starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        this.sendqueue = new LinkedBlockingQueue();
        this.recvqueue = new LinkedBlockingQueue();
        this.messenger = new Messenger(manager);
    }

    public void start() {
        this.messenger.start();
    }

    private void leaveInstance(Vote v) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}", new Object[]{v.getId(), Long.toHexString(v.getZxid()), this.self.getId(), this.self.getPeerState()});
        }
        this.recvqueue.clear();
    }

    public QuorumCnxManager getCnxManager() {
        return this.manager;
    }

    @Override
    public void shutdown() {
        this.stop = true;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        LOG.debug("Shutting down connection manager");
        this.manager.halt();
        LOG.debug("Shutting down messenger");
        this.messenger.halt();
        LOG.debug("FLE is down");
    }

    private void sendNotifications() {
        for (long sid : this.self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = this.self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending Notification: " + this.proposedLeader + " (n.leader), 0x" + Long.toHexString(this.proposedZxid) + " (n.zxid), 0x" + Long.toHexString(this.logicalclock.get()) + " (n.round), " + sid + " (recipient), " + this.self.getId() + " (myid), 0x" + Long.toHexString(this.proposedEpoch) + " (n.peerEpoch)");
            }
            this.sendqueue.offer(notmsg);
        }
    }

    private void printNotification(Notification n2) {
        LOG.info("Notification: " + Long.toHexString(n2.version) + " (message format version), " + n2.leader + " (n.leader), 0x" + Long.toHexString(n2.zxid) + " (n.zxid), 0x" + Long.toHexString(n2.electionEpoch) + " (n.round), " + (Object)((Object)n2.state) + " (n.state), " + n2.sid + " (n.sid), 0x" + Long.toHexString(n2.peerEpoch) + " (n.peerEPoch), " + (Object)((Object)this.self.getPeerState()) + " (my state)" + (n2.qv != null ? Long.toHexString(n2.qv.getVersion()) + " (n.config version)" : ""));
    }

    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if (this.self.getQuorumVerifier().getWeight(newId) == 0L) {
            return false;
        }
        return newEpoch > curEpoch || newEpoch == curEpoch && (newZxid > curZxid || newZxid == curZxid && newId > curId);
    }

    private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(this.self.getQuorumVerifier());
        if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
        }
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (!vote.equals(entry.getValue())) continue;
            voteSet.addAck(entry.getKey());
        }
        return voteSet.hasAllQuorums();
    }

    private boolean checkLeader(HashMap<Long, Vote> votes, long leader, long electionEpoch) {
        boolean predicate = true;
        if (leader != this.self.getId()) {
            if (votes.get(leader) == null) {
                predicate = false;
            } else if (votes.get(leader).getState() != QuorumPeer.ServerState.LEADING) {
                predicate = false;
            }
        } else if (this.logicalclock.get() != electionEpoch) {
            predicate = false;
        }
        return predicate;
    }

    synchronized void updateProposal(long leader, long zxid, long epoch) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + this.proposedLeader + " (oldleader), 0x" + Long.toHexString(this.proposedZxid) + " (oldzxid)");
        }
        this.proposedLeader = leader;
        this.proposedZxid = zxid;
        this.proposedEpoch = epoch;
    }

    public synchronized Vote getVote() {
        return new Vote(this.proposedLeader, this.proposedZxid, this.proposedEpoch);
    }

    private QuorumPeer.ServerState learningState() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            LOG.debug("I'm a participant: " + this.self.getId());
            return QuorumPeer.ServerState.FOLLOWING;
        }
        LOG.debug("I'm an observer: " + this.self.getId());
        return QuorumPeer.ServerState.OBSERVING;
    }

    private long getInitId() {
        if (this.self.getQuorumVerifier().getVotingMembers().containsKey(this.self.getId())) {
            return this.self.getId();
        }
        return Long.MIN_VALUE;
    }

    private long getInitLastLoggedZxid() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getLastLoggedZxid();
        }
        return Long.MIN_VALUE;
    }

    private long getPeerEpoch() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            try {
                return this.self.getCurrentEpoch();
            }
            catch (IOException e2) {
                RuntimeException re = new RuntimeException(e2.getMessage());
                re.setStackTrace(e2.getStackTrace());
                throw re;
            }
        }
        return Long.MIN_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vote lookForLeader() throws InterruptedException {
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        }
        catch (Exception e2) {
            LOG.warn("Failed to register with JMX", e2);
            this.self.jmxLeaderElectionBean = null;
        }
        if (this.self.start_fle == 0L) {
            this.self.start_fle = Time.currentElapsedTime();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = 200;
            FastLeaderElection fastLeaderElection = this;
            synchronized (fastLeaderElection) {
                this.logicalclock.incrementAndGet();
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
            }
            LOG.info("New election. My id =  " + this.self.getId() + ", proposed zxid=0x" + Long.toHexString(this.proposedZxid));
            this.sendNotifications();
            block29: while (this.self.getPeerState() == QuorumPeer.ServerState.LOOKING && !this.stop) {
                Notification n2 = this.recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                if (n2 == null) {
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications();
                    } else {
                        this.manager.connectAll();
                    }
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = tmpTimeOut < 60000 ? tmpTimeOut : 60000;
                    LOG.info("Notification time out: " + notTimeout);
                    continue;
                }
                if (this.self.getCurrentAndNextConfigVoters().contains(n2.sid)) {
                    switch (n2.state) {
                        case LOOKING: {
                            if (n2.electionEpoch > this.logicalclock.get()) {
                                this.logicalclock.set(n2.electionEpoch);
                                recvset.clear();
                                if (this.totalOrderPredicate(n2.leader, n2.zxid, n2.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
                                    this.updateProposal(n2.leader, n2.zxid, n2.peerEpoch);
                                } else {
                                    this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
                                }
                                this.sendNotifications();
                            } else {
                                if (n2.electionEpoch < this.logicalclock.get()) {
                                    if (!LOG.isDebugEnabled()) continue block29;
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n2.electionEpoch) + ", logicalclock=0x" + Long.toHexString(this.logicalclock.get()));
                                    continue block29;
                                }
                                if (this.totalOrderPredicate(n2.leader, n2.zxid, n2.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.updateProposal(n2.leader, n2.zxid, n2.peerEpoch);
                                    this.sendNotifications();
                                }
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Adding vote: from=" + n2.sid + ", proposed leader=" + n2.leader + ", proposed zxid=0x" + Long.toHexString(n2.zxid) + ", proposed election epoch=0x" + Long.toHexString(n2.electionEpoch));
                            }
                            recvset.put(n2.sid, new Vote(n2.leader, n2.zxid, n2.electionEpoch, n2.peerEpoch));
                            if (!this.termPredicate(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch))) continue block29;
                            while ((n2 = this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                                if (!this.totalOrderPredicate(n2.leader, n2.zxid, n2.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) continue;
                                this.recvqueue.put(n2);
                                break;
                            }
                            if (n2 != null) continue block29;
                            this.self.setPeerState(this.proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                            Vote endVote = new Vote(this.proposedLeader, this.proposedZxid, this.proposedEpoch);
                            this.leaveInstance(endVote);
                            Vote vote = endVote;
                            return vote;
                        }
                        case OBSERVING: {
                            LOG.debug("Notification from observer: " + n2.sid);
                            continue block29;
                        }
                        case FOLLOWING: 
                        case LEADING: {
                            if (n2.electionEpoch == this.logicalclock.get()) {
                                recvset.put(n2.sid, new Vote(n2.leader, n2.zxid, n2.electionEpoch, n2.peerEpoch));
                                if (this.termPredicate(recvset, new Vote(n2.leader, n2.zxid, n2.electionEpoch, n2.peerEpoch, n2.state)) && this.checkLeader(outofelection, n2.leader, n2.electionEpoch)) {
                                    this.self.setPeerState(n2.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                                    Vote endVote = new Vote(n2.leader, n2.zxid, n2.peerEpoch);
                                    this.leaveInstance(endVote);
                                    Vote vote = endVote;
                                    return vote;
                                }
                            }
                            outofelection.put(n2.sid, new Vote(n2.leader, -1L, -1L, n2.peerEpoch, n2.state));
                            if (!this.termPredicate(outofelection, new Vote(n2.leader, -1L, -1L, n2.peerEpoch, n2.state)) || !this.checkLeader(outofelection, n2.leader, -1L)) continue block29;
                            Object endVote = this;
                            synchronized (endVote) {
                                this.logicalclock.set(n2.electionEpoch);
                                this.self.setPeerState(n2.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                            }
                            endVote = new Vote(n2.leader, n2.zxid, n2.peerEpoch);
                            this.leaveInstance((Vote)endVote);
                            Object object = endVote;
                            return object;
                        }
                    }
                    LOG.warn("Notification state unrecoginized: " + (Object)((Object)n2.state) + " (n.state), " + n2.sid + " (n.sid)");
                    continue;
                }
                LOG.warn("Ignoring notification from non-cluster member " + n2.sid);
            }
            fastLeaderElection = null;
            return fastLeaderElection;
        }
        finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            }
            catch (Exception e3) {
                LOG.warn("Failed to unregister with JMX", e3);
            }
            this.self.jmxLeaderElectionBean = null;
        }
    }

    protected class Messenger {
        WorkerSender ws;
        WorkerReceiver wr;
        Thread wsThread = null;
        Thread wrThread = null;

        Messenger(QuorumCnxManager manager) {
            this.ws = new WorkerSender(manager);
            this.wsThread = new Thread((Runnable)this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wsThread.setDaemon(true);
            this.wr = new WorkerReceiver(manager);
            this.wrThread = new Thread((Runnable)this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

        void start() {
            this.wsThread.start();
            this.wrThread.start();
        }

        void halt() {
            this.ws.stop = true;
            this.wr.stop = true;
        }

        class WorkerSender
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager) {
                super("WorkerSender");
                this.stop = false;
                this.manager = manager;
            }

            @Override
            public void run() {
                while (!this.stop) {
                    try {
                        ToSend m2 = FastLeaderElection.this.sendqueue.poll(3000L, TimeUnit.MILLISECONDS);
                        if (m2 == null) continue;
                        this.process(m2);
                    }
                    catch (InterruptedException e2) {
                        // empty catch block
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            void process(ToSend m2) {
                ByteBuffer requestBuffer = FastLeaderElection.buildMsg(m2.state.ordinal(), m2.leader, m2.zxid, m2.electionEpoch, m2.peerEpoch, m2.configData);
                this.manager.toSend(m2.sid, requestBuffer);
            }
        }

        class WorkerReceiver
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                block14: while (!this.stop) {
                    try {
                        ToSend notmsg;
                        QuorumVerifier qv;
                        long rpeerepoch;
                        QuorumCnxManager.Message response = this.manager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                        if (response == null) continue;
                        if (response.buffer.capacity() < 28) {
                            LOG.error("Got a short response: " + response.buffer.capacity());
                            continue;
                        }
                        boolean backCompatibility28 = response.buffer.capacity() == 28;
                        boolean backCompatibility40 = response.buffer.capacity() == 40;
                        response.buffer.clear();
                        Notification n2 = new Notification();
                        int rstate = response.buffer.getInt();
                        long rleader = response.buffer.getLong();
                        long rzxid = response.buffer.getLong();
                        long relectionEpoch = response.buffer.getLong();
                        int version = 0;
                        if (!backCompatibility28) {
                            rpeerepoch = response.buffer.getLong();
                            if (!backCompatibility40) {
                                version = response.buffer.getInt();
                            } else {
                                LOG.info("Backward compatibility mode (36 bits), server id: {}", (Object)response.sid);
                            }
                        } else {
                            LOG.info("Backward compatibility mode (28 bits), server id: {}", (Object)response.sid);
                            rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                        }
                        QuorumVerifier rqv = null;
                        if (version > 1) {
                            int configLength = response.buffer.getInt();
                            byte[] b2 = new byte[configLength];
                            response.buffer.get(b2);
                            QuorumPeer quorumPeer = FastLeaderElection.this.self;
                            synchronized (quorumPeer) {
                                try {
                                    rqv = FastLeaderElection.this.self.configFromString(new String(b2));
                                    QuorumVerifier curQV = FastLeaderElection.this.self.getQuorumVerifier();
                                    if (rqv.getVersion() > curQV.getVersion()) {
                                        LOG.info("{} Received version: {} my version: {}", FastLeaderElection.this.self.getId(), Long.toHexString(rqv.getVersion()), Long.toHexString(FastLeaderElection.this.self.getQuorumVerifier().getVersion()));
                                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                                            LOG.debug("Invoking processReconfig(), state: {}", (Object)FastLeaderElection.this.self.getServerState());
                                            FastLeaderElection.this.self.processReconfig(rqv, null, null, false);
                                            if (!rqv.equals(curQV)) {
                                                LOG.info("restarting leader election");
                                                FastLeaderElection.this.self.shuttingDownLE = true;
                                                FastLeaderElection.this.self.getElectionAlg().shutdown();
                                                break;
                                            }
                                        } else {
                                            LOG.debug("Skip processReconfig(), state: {}", (Object)FastLeaderElection.this.self.getServerState());
                                        }
                                    }
                                }
                                catch (IOException e2) {
                                    LOG.error("Something went wrong while processing config received from {}", (Object)response.sid);
                                }
                                catch (QuorumPeerConfig.ConfigException e3) {
                                    LOG.error("Something went wrong while processing config received from {}", (Object)response.sid);
                                }
                            }
                        }
                        LOG.info("Backward compatibility mode (before reconfig), server id: {}", (Object)response.sid);
                        if (!FastLeaderElection.this.self.getCurrentAndNextConfigVoters().contains(response.sid)) {
                            Vote current = FastLeaderElection.this.self.getCurrentVote();
                            QuorumVerifier qv2 = FastLeaderElection.this.self.getQuorumVerifier();
                            ToSend notmsg2 = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), FastLeaderElection.this.logicalclock.get(), FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch(), qv2.toString().getBytes());
                            FastLeaderElection.this.sendqueue.offer(notmsg2);
                            continue;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Receive new notification message. My id = " + FastLeaderElection.this.self.getId());
                        }
                        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                        switch (rstate) {
                            case 0: {
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            }
                            case 1: {
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            }
                            case 2: {
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            }
                            case 3: {
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            }
                            default: {
                                continue block14;
                            }
                        }
                        n2.leader = rleader;
                        n2.zxid = rzxid;
                        n2.electionEpoch = relectionEpoch;
                        n2.state = ackstate;
                        n2.sid = response.sid;
                        n2.peerEpoch = rpeerepoch;
                        n2.version = version;
                        n2.qv = rqv;
                        if (LOG.isInfoEnabled()) {
                            FastLeaderElection.this.printNotification(n2);
                        }
                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                            FastLeaderElection.this.recvqueue.offer(n2);
                            if (ackstate != QuorumPeer.ServerState.LOOKING || n2.electionEpoch >= FastLeaderElection.this.logicalclock.get()) continue;
                            Vote v = FastLeaderElection.this.getVote();
                            qv = FastLeaderElection.this.self.getQuorumVerifier();
                            notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), FastLeaderElection.this.logicalclock.get(), FastLeaderElection.this.self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes());
                            FastLeaderElection.this.sendqueue.offer(notmsg);
                            continue;
                        }
                        Vote current = FastLeaderElection.this.self.getCurrentVote();
                        if (ackstate != QuorumPeer.ServerState.LOOKING) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", FastLeaderElection.this.self.getId(), response.sid, Long.toHexString(current.getZxid()), current.getId(), Long.toHexString(FastLeaderElection.this.self.getQuorumVerifier().getVersion()));
                        }
                        qv = FastLeaderElection.this.self.getQuorumVerifier();
                        notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes());
                        FastLeaderElection.this.sendqueue.offer(notmsg);
                    }
                    catch (InterruptedException e4) {
                        LOG.warn("Interrupted Exception while waiting for new message" + e4.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }
    }

    public static class ToSend {
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        byte[] configData = dummyData;
        long peerEpoch;

        ToSend(mType type, long leader, long zxid, long electionEpoch, QuorumPeer.ServerState state, long sid, long peerEpoch, byte[] configData) {
            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
            this.configData = configData;
        }

        static enum mType {
            crequest,
            challenge,
            notification,
            ack;

        }
    }

    public static class Notification {
        public static final int CURRENTVERSION = 2;
        int version;
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        QuorumVerifier qv;
        long peerEpoch;
    }
}

