/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.shaded.org.apache.zookeeper.server.quorum;

import io.mantisrx.shaded.org.apache.zookeeper.common.Time;
import io.mantisrx.shaded.org.apache.zookeeper.jmx.MBeanRegistry;
import io.mantisrx.shaded.org.apache.zookeeper.server.ZooKeeperThread;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.Election;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.LeaderElectionBean;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumCnxManager;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.Vote;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import io.mantisrx.shaded.org.apache.zookeeper.server.util.ZxidUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FastLeaderElection
implements Election {
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
    static final int finalizeWait = 200;
    private static int maxNotificationInterval = 60000;
    private static int minNotificationInterval = 200;
    public static final String MIN_NOTIFICATION_INTERVAL = "zookeeper.fastleader.minNotificationInterval";
    public static final String MAX_NOTIFICATION_INTERVAL = "zookeeper.fastleader.maxNotificationInterval";
    QuorumCnxManager manager;
    private SyncedLearnerTracker leadingVoteSet;
    static byte[] dummyData;
    LinkedBlockingQueue<ToSend> sendqueue;
    LinkedBlockingQueue<Notification> recvqueue;
    QuorumPeer self;
    Messenger messenger;
    AtomicLong logicalclock = new AtomicLong();
    long proposedLeader;
    long proposedZxid;
    long proposedEpoch;
    volatile boolean stop = false;

    public long getLogicalClock() {
        return this.logicalclock.get();
    }

    static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) {
        byte[] requestBytes = new byte[40];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(1);
        return requestBuffer;
    }

    static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData) {
        byte[] requestBytes = new byte[44 + configData.length];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(2);
        requestBuffer.putInt(configData.length);
        requestBuffer.put(configData);
        return requestBuffer;
    }

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
        this.manager = manager;
        this.starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        this.sendqueue = new LinkedBlockingQueue();
        this.recvqueue = new LinkedBlockingQueue();
        this.messenger = new Messenger(manager);
    }

    public void start() {
        this.messenger.start();
    }

    private void leaveInstance(Vote v) {
        LOG.debug("About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}", new Object[]{v.getId(), Long.toHexString(v.getZxid()), this.self.getId(), this.self.getPeerState()});
        this.recvqueue.clear();
    }

    public QuorumCnxManager getCnxManager() {
        return this.manager;
    }

    @Override
    public void shutdown() {
        this.stop = true;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        this.leadingVoteSet = null;
        LOG.debug("Shutting down connection manager");
        this.manager.halt();
        LOG.debug("Shutting down messenger");
        this.messenger.halt();
        LOG.debug("FLE is down");
    }

    private void sendNotifications() {
        for (long sid : this.self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = this.self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes());
            LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient), {} (myid), 0x{} (n.peerEpoch) ", this.proposedLeader, Long.toHexString(this.proposedZxid), Long.toHexString(this.logicalclock.get()), sid, this.self.getId(), Long.toHexString(this.proposedEpoch));
            this.sendqueue.offer(notmsg);
        }
    }

    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}", newId, curId, Long.toHexString(newZxid), Long.toHexString(curZxid));
        if (this.self.getQuorumVerifier().getWeight(newId) == 0L) {
            return false;
        }
        return newEpoch > curEpoch || newEpoch == curEpoch && (newZxid > curZxid || newZxid == curZxid && newId > curId);
    }

    protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(this.self.getQuorumVerifier());
        if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
        }
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (!vote.equals(entry.getValue())) continue;
            voteSet.addAck(entry.getKey());
        }
        return voteSet;
    }

    protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {
        boolean predicate = true;
        if (leader != this.self.getId()) {
            if (votes.get(leader) == null) {
                predicate = false;
            } else if (votes.get(leader).getState() != QuorumPeer.ServerState.LEADING) {
                predicate = false;
            }
        } else if (this.logicalclock.get() != electionEpoch) {
            predicate = false;
        }
        return predicate;
    }

    synchronized void updateProposal(long leader, long zxid, long epoch) {
        LOG.debug("Updating proposal: {} (newleader), 0x{} (newzxid), {} (oldleader), 0x{} (oldzxid)", leader, Long.toHexString(zxid), this.proposedLeader, Long.toHexString(this.proposedZxid));
        this.proposedLeader = leader;
        this.proposedZxid = zxid;
        this.proposedEpoch = epoch;
    }

    public synchronized Vote getVote() {
        return new Vote(this.proposedLeader, this.proposedZxid, this.proposedEpoch);
    }

    private QuorumPeer.ServerState learningState() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            LOG.debug("I am a participant: {}", (Object)this.self.getId());
            return QuorumPeer.ServerState.FOLLOWING;
        }
        LOG.debug("I am an observer: {}", (Object)this.self.getId());
        return QuorumPeer.ServerState.OBSERVING;
    }

    private long getInitId() {
        if (this.self.getQuorumVerifier().getVotingMembers().containsKey(this.self.getId())) {
            return this.self.getId();
        }
        return Long.MIN_VALUE;
    }

    private long getInitLastLoggedZxid() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getLastLoggedZxid();
        }
        return Long.MIN_VALUE;
    }

    private long getPeerEpoch() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            try {
                return this.self.getCurrentEpoch();
            }
            catch (IOException e2) {
                RuntimeException re = new RuntimeException(e2.getMessage());
                re.setStackTrace(e2.getStackTrace());
                throw re;
            }
        }
        return Long.MIN_VALUE;
    }

    private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet) {
        QuorumPeer.ServerState ss = proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState();
        this.self.setPeerState(ss);
        if (ss == QuorumPeer.ServerState.LEADING) {
            this.leadingVoteSet = voteSet;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vote lookForLeader() throws InterruptedException {
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        }
        catch (Exception e2) {
            LOG.warn("Failed to register with JMX", e2);
            this.self.jmxLeaderElectionBean = null;
        }
        this.self.start_fle = Time.currentElapsedTime();
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = minNotificationInterval;
            FastLeaderElection fastLeaderElection = this;
            synchronized (fastLeaderElection) {
                this.logicalclock.incrementAndGet();
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
            }
            LOG.info("New election. My id = {}, proposed zxid=0x{}", (Object)this.self.getId(), (Object)Long.toHexString(this.proposedZxid));
            this.sendNotifications();
            block29: while (this.self.getPeerState() == QuorumPeer.ServerState.LOOKING && !this.stop) {
                Notification n = this.recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) {
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications();
                    } else {
                        this.manager.connectAll();
                    }
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", (Object)notTimeout);
                    continue;
                }
                if (this.validVoter(n.sid) && this.validVoter(n.leader)) {
                    switch (n.state) {
                        case LOOKING: {
                            if (this.getInitLastLoggedZxid() == -1L) {
                                LOG.debug("Ignoring notification as our zxid is -1");
                                continue block29;
                            }
                            if (n.zxid == -1L) {
                                LOG.debug("Ignoring notification from member with -1 zxid {}", (Object)n.sid);
                                continue block29;
                            }
                            if (n.electionEpoch > this.logicalclock.get()) {
                                this.logicalclock.set(n.electionEpoch);
                                recvset.clear();
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
                                }
                                this.sendNotifications();
                            } else {
                                if (n.electionEpoch < this.logicalclock.get()) {
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", (Object)Long.toHexString(n.electionEpoch), (Object)Long.toHexString(this.logicalclock.get()));
                                    continue block29;
                                }
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    this.sendNotifications();
                                }
                            }
                            LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch));
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            SyncedLearnerTracker voteSet = this.getVoteTracker(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch));
                            if (!voteSet.hasAllQuorums()) continue block29;
                            while ((n = this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                                if (!this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) continue;
                                this.recvqueue.put(n);
                                break;
                            }
                            if (n != null) continue block29;
                            this.setPeerState(this.proposedLeader, voteSet);
                            Vote endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch);
                            this.leaveInstance(endVote);
                            Vote vote = endVote;
                            return vote;
                        }
                        case OBSERVING: {
                            LOG.debug("Notification from observer: {}", (Object)n.sid);
                            continue block29;
                        }
                        case FOLLOWING: 
                        case LEADING: {
                            SyncedLearnerTracker voteSet;
                            if (n.electionEpoch == this.logicalclock.get()) {
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                voteSet = this.getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                if (voteSet.hasAllQuorums() && this.checkLeader(recvset, n.leader, n.electionEpoch)) {
                                    this.setPeerState(n.leader, voteSet);
                                    Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                    this.leaveInstance(endVote);
                                    Vote vote = endVote;
                                    return vote;
                                }
                            }
                            outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = this.getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (!voteSet.hasAllQuorums() || !this.checkLeader(outofelection, n.leader, n.electionEpoch)) continue block29;
                            Object endVote = this;
                            synchronized (endVote) {
                                this.logicalclock.set(n.electionEpoch);
                                this.setPeerState(n.leader, voteSet);
                            }
                            endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            this.leaveInstance((Vote)endVote);
                            Object object = endVote;
                            return object;
                        }
                    }
                    LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", (Object)n.state, (Object)n.sid);
                    continue;
                }
                if (!this.validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", (Object)n.leader, (Object)n.sid);
                }
                if (this.validVoter(n.sid)) continue;
                LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", (Object)n.leader, (Object)n.sid);
            }
            Vote vote = null;
            return vote;
        }
        finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            }
            catch (Exception e3) {
                LOG.warn("Failed to unregister with JMX", e3);
            }
            this.self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", (Object)this.manager.getConnectionThreadCount());
        }
    }

    private boolean validVoter(long sid) {
        return this.self.getCurrentAndNextConfigVoters().contains(sid);
    }

    static {
        minNotificationInterval = Integer.getInteger(MIN_NOTIFICATION_INTERVAL, minNotificationInterval);
        LOG.info("{}={}", (Object)MIN_NOTIFICATION_INTERVAL, (Object)minNotificationInterval);
        maxNotificationInterval = Integer.getInteger(MAX_NOTIFICATION_INTERVAL, maxNotificationInterval);
        LOG.info("{}={}", (Object)MAX_NOTIFICATION_INTERVAL, (Object)maxNotificationInterval);
        dummyData = new byte[0];
    }

    protected class Messenger {
        WorkerSender ws;
        WorkerReceiver wr;
        Thread wsThread = null;
        Thread wrThread = null;

        Messenger(QuorumCnxManager manager) {
            this.ws = new WorkerSender(manager);
            this.wsThread = new Thread((Runnable)this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wsThread.setDaemon(true);
            this.wr = new WorkerReceiver(manager);
            this.wrThread = new Thread((Runnable)this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

        void start() {
            this.wsThread.start();
            this.wrThread.start();
        }

        void halt() {
            this.ws.stop = true;
            this.wr.stop = true;
        }

        class WorkerSender
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager) {
                super("WorkerSender");
                this.stop = false;
                this.manager = manager;
            }

            @Override
            public void run() {
                while (!this.stop) {
                    try {
                        ToSend m = FastLeaderElection.this.sendqueue.poll(3000L, TimeUnit.MILLISECONDS);
                        if (m == null) continue;
                        this.process(m);
                    }
                    catch (InterruptedException e2) {
                        // empty catch block
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            void process(ToSend m) {
                ByteBuffer requestBuffer = FastLeaderElection.buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
                this.manager.toSend(m.sid, requestBuffer);
            }
        }

        class WorkerReceiver
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                block14: while (!this.stop) {
                    try {
                        ToSend notmsg;
                        QuorumVerifier qv;
                        long rpeerepoch;
                        QuorumCnxManager.Message response = this.manager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                        if (response == null) continue;
                        if (response.buffer.capacity() < 28) {
                            LOG.error("Got a short response: {}", (Object)response.buffer.capacity());
                            continue;
                        }
                        boolean backCompatibility28 = response.buffer.capacity() == 28;
                        boolean backCompatibility40 = response.buffer.capacity() == 40;
                        response.buffer.clear();
                        Notification n = new Notification();
                        int rstate = response.buffer.getInt();
                        long rleader = response.buffer.getLong();
                        long rzxid = response.buffer.getLong();
                        long relectionEpoch = response.buffer.getLong();
                        int version = 0;
                        if (!backCompatibility28) {
                            rpeerepoch = response.buffer.getLong();
                            if (!backCompatibility40) {
                                version = response.buffer.getInt();
                            } else {
                                LOG.info("Backward compatibility mode (36 bits), server id: {}", (Object)response.sid);
                            }
                        } else {
                            LOG.info("Backward compatibility mode (28 bits), server id: {}", (Object)response.sid);
                            rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                        }
                        QuorumVerifier rqv = null;
                        if (version > 1) {
                            int configLength = response.buffer.getInt();
                            byte[] b = new byte[configLength];
                            response.buffer.get(b);
                            QuorumPeer quorumPeer = FastLeaderElection.this.self;
                            synchronized (quorumPeer) {
                                try {
                                    rqv = FastLeaderElection.this.self.configFromString(new String(b));
                                    QuorumVerifier curQV = FastLeaderElection.this.self.getQuorumVerifier();
                                    if (rqv.getVersion() > curQV.getVersion()) {
                                        LOG.info("{} Received version: {} my version: {}", FastLeaderElection.this.self.getId(), Long.toHexString(rqv.getVersion()), Long.toHexString(FastLeaderElection.this.self.getQuorumVerifier().getVersion()));
                                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                                            LOG.debug("Invoking processReconfig(), state: {}", (Object)FastLeaderElection.this.self.getServerState());
                                            FastLeaderElection.this.self.processReconfig(rqv, null, null, false);
                                            if (!rqv.equals(curQV)) {
                                                LOG.info("restarting leader election");
                                                FastLeaderElection.this.self.shuttingDownLE = true;
                                                FastLeaderElection.this.self.getElectionAlg().shutdown();
                                                break;
                                            }
                                        } else {
                                            LOG.debug("Skip processReconfig(), state: {}", (Object)FastLeaderElection.this.self.getServerState());
                                        }
                                    }
                                }
                                catch (IOException e2) {
                                    LOG.error("Something went wrong while processing config received from {}", (Object)response.sid);
                                }
                                catch (QuorumPeerConfig.ConfigException e3) {
                                    LOG.error("Something went wrong while processing config received from {}", (Object)response.sid);
                                }
                            }
                        }
                        LOG.info("Backward compatibility mode (before reconfig), server id: {}", (Object)response.sid);
                        if (!FastLeaderElection.this.validVoter(response.sid)) {
                            Vote current = FastLeaderElection.this.self.getCurrentVote();
                            QuorumVerifier qv2 = FastLeaderElection.this.self.getQuorumVerifier();
                            ToSend notmsg2 = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), FastLeaderElection.this.logicalclock.get(), FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch(), qv2.toString().getBytes());
                            FastLeaderElection.this.sendqueue.offer(notmsg2);
                            continue;
                        }
                        LOG.debug("Receive new notification message. My id = {}", (Object)FastLeaderElection.this.self.getId());
                        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                        switch (rstate) {
                            case 0: {
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            }
                            case 1: {
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            }
                            case 2: {
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            }
                            case 3: {
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            }
                            default: {
                                continue block14;
                            }
                        }
                        n.leader = rleader;
                        n.zxid = rzxid;
                        n.electionEpoch = relectionEpoch;
                        n.state = ackstate;
                        n.sid = response.sid;
                        n.peerEpoch = rpeerepoch;
                        n.version = version;
                        n.qv = rqv;
                        LOG.info("Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}", new Object[]{FastLeaderElection.this.self.getPeerState(), n.sid, n.state, n.leader, Long.toHexString(n.electionEpoch), Long.toHexString(n.peerEpoch), Long.toHexString(n.zxid), Long.toHexString(n.version), n.qv != null ? Long.toHexString(n.qv.getVersion()) : "0"});
                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                            FastLeaderElection.this.recvqueue.offer(n);
                            if (ackstate != QuorumPeer.ServerState.LOOKING || n.electionEpoch >= FastLeaderElection.this.logicalclock.get()) continue;
                            Vote v = FastLeaderElection.this.getVote();
                            qv = FastLeaderElection.this.self.getQuorumVerifier();
                            notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), FastLeaderElection.this.logicalclock.get(), FastLeaderElection.this.self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes());
                            FastLeaderElection.this.sendqueue.offer(notmsg);
                            continue;
                        }
                        Vote current = FastLeaderElection.this.self.getCurrentVote();
                        if (ackstate != QuorumPeer.ServerState.LOOKING) continue;
                        if (FastLeaderElection.this.self.leader != null) {
                            if (FastLeaderElection.this.leadingVoteSet != null) {
                                FastLeaderElection.this.self.leader.setLeadingVoteSet(FastLeaderElection.this.leadingVoteSet);
                                FastLeaderElection.this.leadingVoteSet = null;
                            }
                            FastLeaderElection.this.self.leader.reportLookingSid(response.sid);
                        }
                        LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", FastLeaderElection.this.self.getId(), response.sid, Long.toHexString(current.getZxid()), current.getId(), Long.toHexString(FastLeaderElection.this.self.getQuorumVerifier().getVersion()));
                        qv = FastLeaderElection.this.self.getQuorumVerifier();
                        notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes());
                        FastLeaderElection.this.sendqueue.offer(notmsg);
                    }
                    catch (InterruptedException e4) {
                        LOG.warn("Interrupted Exception while waiting for new message", e4);
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }
    }

    public static class ToSend {
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        byte[] configData = dummyData;
        long peerEpoch;

        ToSend(mType type2, long leader, long zxid, long electionEpoch, QuorumPeer.ServerState state, long sid, long peerEpoch, byte[] configData) {
            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
            this.configData = configData;
        }

        static enum mType {
            crequest,
            challenge,
            notification,
            ack;

        }
    }

    public static class Notification {
        public static final int CURRENTVERSION = 2;
        int version;
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        QuorumVerifier qv;
        long peerEpoch;
    }
}

