/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.shaded.org.apache.zookeeper.server.quorum;

import io.mantisrx.shaded.org.apache.jute.BinaryInputArchive;
import io.mantisrx.shaded.org.apache.jute.BinaryOutputArchive;
import io.mantisrx.shaded.org.apache.zookeeper.server.Request;
import io.mantisrx.shaded.org.apache.zookeeper.server.ServerMetrics;
import io.mantisrx.shaded.org.apache.zookeeper.server.TxnLogProposalIterator;
import io.mantisrx.shaded.org.apache.zookeeper.server.ZKDatabase;
import io.mantisrx.shaded.org.apache.zookeeper.server.ZooKeeperThread;
import io.mantisrx.shaded.org.apache.zookeeper.server.ZooTrace;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.Leader;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.LearnerMaster;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.LearnerSyncRequest;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.LearnerSyncThrottler;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.ObserverMaster;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumPacket;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.StateSummary;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.SyncThrottleException;
import io.mantisrx.shaded.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import io.mantisrx.shaded.org.apache.zookeeper.server.util.MessageTracker;
import io.mantisrx.shaded.org.apache.zookeeper.server.util.ZxidUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.security.sasl.SaslException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerHandler
extends ZooKeeperThread {
    private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
    protected final Socket sock;
    final LearnerMaster learnerMaster;
    volatile long tickOfNextAckDeadline;
    protected long sid = 0L;
    protected int version = 1;
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue();
    private final AtomicLong queuedPacketsSize = new AtomicLong();
    protected final AtomicLong packetsReceived = new AtomicLong();
    protected final AtomicLong packetsSent = new AtomicLong();
    protected final AtomicLong requestsReceived = new AtomicLong();
    protected volatile long lastZxid = -1L;
    protected final Date established = new Date();
    private final int markerPacketInterval = 1000;
    private AtomicInteger packetCounter = new AtomicInteger();
    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
    private BinaryInputArchive ia;
    private BinaryOutputArchive oa;
    private final BufferedInputStream bufferedInput;
    private BufferedOutputStream bufferedOutput;
    protected final MessageTracker messageTracker;
    private volatile boolean sendingThreadStarted = false;
    public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
    private boolean forceSnapSync = false;
    private boolean needOpPacket = true;
    private long leaderLastZxid;
    private LearnerSyncThrottler syncThrottler = null;
    final QuorumPacket proposalOfDeath = new QuorumPacket();
    private QuorumPeer.LearnerType learnerType = QuorumPeer.LearnerType.PARTICIPANT;

    public Socket getSocket() {
        return this.sock;
    }

    long getSid() {
        return this.sid;
    }

    String getRemoteAddress() {
        return this.sock == null ? "<null>" : this.sock.getRemoteSocketAddress().toString();
    }

    int getVersion() {
        return this.version;
    }

    public synchronized long getLastZxid() {
        return this.lastZxid;
    }

    public Date getEstablished() {
        return (Date)this.established.clone();
    }

    protected void setOutputArchive(BinaryOutputArchive oa) {
        this.oa = oa;
    }

    protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
        this.bufferedOutput = bufferedOutput;
    }

    LearnerHandler(Socket sock, BufferedInputStream bufferedInput, LearnerMaster learnerMaster) throws IOException {
        super("LearnerHandler-" + sock.getRemoteSocketAddress());
        this.sock = sock;
        this.learnerMaster = learnerMaster;
        this.bufferedInput = bufferedInput;
        if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
            this.forceSnapSync = true;
            LOG.info("Forcing snapshot sync is enabled");
        }
        try {
            QuorumAuthServer authServer = learnerMaster.getQuorumAuthServer();
            if (authServer != null) {
                authServer.authenticate(sock, new DataInputStream(bufferedInput));
            }
        }
        catch (IOException e2) {
            LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection", (Object)sock.getRemoteSocketAddress(), (Object)e2);
            try {
                sock.close();
            }
            catch (IOException ie) {
                LOG.error("Exception while closing socket", ie);
            }
            throw new SaslException("Authentication failure: " + e2.getMessage());
        }
        this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("LearnerHandler ").append(this.sock);
        sb.append(" tickOfNextAckDeadline:").append(this.tickOfNextAckDeadline());
        sb.append(" synced?:").append(this.synced());
        sb.append(" queuedPacketLength:").append(this.queuedPackets.size());
        return sb.toString();
    }

    public QuorumPeer.LearnerType getLearnerType() {
        return this.learnerType;
    }

    private void sendPackets() throws InterruptedException {
        block12: {
            long traceMask = 16L;
            try {
                while (true) {
                    QuorumPacket p;
                    if ((p = this.queuedPackets.poll()) == null) {
                        this.bufferedOutput.flush();
                        p = this.queuedPackets.take();
                    }
                    ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), this.queuedPackets.size());
                    if (p instanceof MarkerQuorumPacket) {
                        MarkerQuorumPacket m = (MarkerQuorumPacket)p;
                        ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME.add(Long.toString(this.sid), (System.nanoTime() - m.time) / 1000000L);
                        continue;
                    }
                    this.queuedPacketsSize.addAndGet(-LearnerHandler.packetSize(p));
                    if (p != this.proposalOfDeath) {
                        if (p.getType() == 5) {
                            traceMask = 128L;
                        }
                        if (p.getType() == 2) {
                            this.syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
                        }
                        if (LOG.isTraceEnabled()) {
                            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                        }
                        if (p.getZxid() > 0L) {
                            this.lastZxid = p.getZxid();
                        }
                        this.oa.writeRecord(p, "packet");
                        this.packetsSent.incrementAndGet();
                        this.messageTracker.trackSent(p.getType());
                        continue;
                    }
                    break;
                }
            }
            catch (IOException e2) {
                if (this.sock.isClosed()) break block12;
                LOG.warn("Unexpected exception at {}", (Object)this, (Object)e2);
                try {
                    this.sock.close();
                }
                catch (IOException ie) {
                    LOG.warn("Error closing socket for handler {}", (Object)this, (Object)ie);
                }
            }
        }
    }

    public static String packetToString(QuorumPacket p) {
        String type2;
        String mess = null;
        switch (p.getType()) {
            case 3: {
                type2 = "ACK";
                break;
            }
            case 4: {
                type2 = "COMMIT";
                break;
            }
            case 11: {
                type2 = "FOLLOWERINFO";
                break;
            }
            case 10: {
                type2 = "NEWLEADER";
                break;
            }
            case 5: {
                type2 = "PING";
                break;
            }
            case 2: {
                type2 = "PROPOSAL";
                break;
            }
            case 1: {
                type2 = "REQUEST";
                break;
            }
            case 6: {
                type2 = "REVALIDATE";
                ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
                DataInputStream dis = new DataInputStream(bis);
                try {
                    long id = dis.readLong();
                    mess = " sessionid = " + id;
                }
                catch (IOException e2) {
                    LOG.warn("Unexpected exception", e2);
                }
                break;
            }
            case 12: {
                type2 = "UPTODATE";
                break;
            }
            case 13: {
                type2 = "DIFF";
                break;
            }
            case 14: {
                type2 = "TRUNC";
                break;
            }
            case 15: {
                type2 = "SNAP";
                break;
            }
            case 18: {
                type2 = "ACKEPOCH";
                break;
            }
            case 7: {
                type2 = "SYNC";
                break;
            }
            case 8: {
                type2 = "INFORM";
                break;
            }
            case 9: {
                type2 = "COMMITANDACTIVATE";
                break;
            }
            case 19: {
                type2 = "INFORMANDACTIVATE";
                break;
            }
            default: {
                type2 = "UNKNOWN" + p.getType();
            }
        }
        String entry = null;
        if (type2 != null) {
            entry = type2 + " " + Long.toHexString(p.getZxid()) + " " + mess;
        }
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            QuorumPacket newLeaderQP;
            boolean exemptFromThrottle;
            String followerInfo;
            this.learnerMaster.addLearnerHandler(this);
            this.tickOfNextAckDeadline = this.learnerMaster.getTickOfInitialAckDeadline();
            this.ia = BinaryInputArchive.getArchive(this.bufferedInput);
            this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
            this.oa = BinaryOutputArchive.getArchive(this.bufferedOutput);
            QuorumPacket qp = new QuorumPacket();
            this.ia.readRecord(qp, "packet");
            this.messageTracker.trackReceived(qp.getType());
            if (qp.getType() != 11 && qp.getType() != 16) {
                LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", (Object)qp.toString());
                return;
            }
            if (this.learnerMaster instanceof ObserverMaster && qp.getType() != 16) {
                throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
            }
            byte[] learnerInfoData = qp.getData();
            if (learnerInfoData != null) {
                long configVersion;
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt();
                }
                if (learnerInfoData.length >= 20 && (configVersion = bbsid.getLong()) > this.learnerMaster.getQuorumVerifierVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            } else {
                this.sid = this.learnerMaster.getAndDecrementFollowerCounter();
            }
            if ((followerInfo = this.learnerMaster.getPeerInfo(this.sid)).isEmpty()) {
                LOG.info("Follower sid: {} not in the current config {}", (Object)this.sid, (Object)Long.toHexString(this.learnerMaster.getQuorumVerifierVersion()));
            } else {
                LOG.info("Follower sid: {} : info : {}", (Object)this.sid, (Object)followerInfo);
            }
            if (qp.getType() == 16) {
                this.learnerType = QuorumPeer.LearnerType.OBSERVER;
            }
            this.learnerMaster.registerLearnerHandlerBean(this, this.sock);
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
            StateSummary ss = null;
            long zxid = qp.getZxid();
            long newEpoch = this.learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0L);
            if (this.getVersion() < 65536) {
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                this.learnerMaster.waitForEpochAck(this.getSid(), ss);
            } else {
                byte[] ver = new byte[4];
                ByteBuffer.wrap(ver).putInt(65536);
                QuorumPacket newEpochPacket = new QuorumPacket(17, newLeaderZxid, ver, null);
                this.oa.writeRecord(newEpochPacket, "packet");
                this.messageTracker.trackSent(17);
                this.bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                this.ia.readRecord(ackEpochPacket, "packet");
                this.messageTracker.trackReceived(ackEpochPacket.getType());
                if (ackEpochPacket.getType() != 18) {
                    LOG.error("{} is not ACKEPOCH", (Object)ackEpochPacket.toString());
                    return;
                }
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                this.learnerMaster.waitForEpochAck(this.getSid(), ss);
            }
            long peerLastZxid = ss.getLastZxid();
            boolean needSnap = this.syncFollower(peerLastZxid, this.learnerMaster);
            boolean bl = exemptFromThrottle = this.getLearnerType() != QuorumPeer.LearnerType.OBSERVER;
            if (needSnap) {
                this.syncThrottler = this.learnerMaster.getLearnerSnapSyncThrottler();
                this.syncThrottler.beginSync(exemptFromThrottle);
                try {
                    long zxidToSend = this.learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                    this.oa.writeRecord(new QuorumPacket(15, zxidToSend, null, null), "packet");
                    this.messageTracker.trackSent(15);
                    this.bufferedOutput.flush();
                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, send zxid of db as 0x{}, {} concurrent snapshot sync, snapshot sync was {} from throttle", Long.toHexString(peerLastZxid), Long.toHexString(this.leaderLastZxid), Long.toHexString(zxidToSend), this.syncThrottler.getSyncInProgress(), exemptFromThrottle ? "exempt" : "not exempt");
                    this.learnerMaster.getZKDatabase().serializeSnapshot(this.oa);
                    this.oa.writeString("BenWasHere", "signature");
                    this.bufferedOutput.flush();
                }
                finally {
                    ServerMetrics.getMetrics().SNAP_COUNT.add(1L);
                }
            } else {
                this.syncThrottler = this.learnerMaster.getLearnerDiffSyncThrottler();
                this.syncThrottler.beginSync(exemptFromThrottle);
                ServerMetrics.getMetrics().DIFF_COUNT.add(1L);
            }
            LOG.debug("Sending NEWLEADER message to {}", (Object)this.sid);
            if (this.getVersion() < 65536) {
                newLeaderQP = new QuorumPacket(10, newLeaderZxid, null, null);
                this.oa.writeRecord(newLeaderQP, "packet");
            } else {
                newLeaderQP = new QuorumPacket(10, newLeaderZxid, this.learnerMaster.getQuorumVerifierBytes(), null);
                this.queuedPackets.add(newLeaderQP);
            }
            this.bufferedOutput.flush();
            this.startSendingPackets();
            qp = new QuorumPacket();
            this.ia.readRecord(qp, "packet");
            this.messageTracker.trackReceived(qp.getType());
            if (qp.getType() != 3) {
                LOG.error("Next packet was supposed to be an ACK, but received packet: {}", (Object)LearnerHandler.packetToString(qp));
                return;
            }
            try {
                LOG.debug("Received NEWLEADER-ACK message from {}", (Object)this.sid);
                this.learnerMaster.waitForNewLeaderAck(this.getSid(), qp.getZxid());
                this.syncLimitCheck.start();
                this.syncThrottler.endSync();
                this.syncThrottler = null;
                this.sock.setSoTimeout(this.learnerMaster.syncTimeout());
                this.learnerMaster.waitForStartup();
                LOG.debug("Sending UPTODATE message to {}", (Object)this.sid);
                this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
                block25: while (true) {
                    qp = new QuorumPacket();
                    this.ia.readRecord(qp, "packet");
                    this.messageTracker.trackReceived(qp.getType());
                    long traceMask = 16L;
                    if (qp.getType() == 5) {
                        traceMask = 128L;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                    }
                    this.tickOfNextAckDeadline = this.learnerMaster.getTickOfNextAckDeadline();
                    this.packetsReceived.incrementAndGet();
                    switch (qp.getType()) {
                        case 3: {
                            if (this.learnerType == QuorumPeer.LearnerType.OBSERVER) {
                                LOG.debug("Received ACK from Observer {}", (Object)this.sid);
                            }
                            this.syncLimitCheck.updateAck(qp.getZxid());
                            this.learnerMaster.processAck(this.sid, qp.getZxid(), this.sock.getLocalSocketAddress());
                            continue block25;
                        }
                        case 5: {
                            ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                            DataInputStream dis = new DataInputStream(bis);
                            while (true) {
                                if (dis.available() <= 0) continue block25;
                                long sess = dis.readLong();
                                int to = dis.readInt();
                                this.learnerMaster.touch(sess, to);
                            }
                        }
                        case 6: {
                            ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1L);
                            this.learnerMaster.revalidateSession(qp, this);
                            continue block25;
                        }
                        case 1: {
                            ByteBuffer bb = ByteBuffer.wrap(qp.getData());
                            long sessionId = bb.getLong();
                            int cxid = bb.getInt();
                            int type2 = bb.getInt();
                            bb = bb.slice();
                            Request si = type2 == 9 ? new LearnerSyncRequest(this, sessionId, cxid, type2, bb, qp.getAuthinfo()) : new Request(null, sessionId, cxid, type2, bb, qp.getAuthinfo());
                            si.setOwner(this);
                            this.learnerMaster.submitLearnerRequest(si);
                            this.requestsReceived.incrementAndGet();
                            continue block25;
                        }
                    }
                    LOG.warn("unexpected quorum packet, type: {}", (Object)LearnerHandler.packetToString(qp));
                }
            }
            catch (IOException e2) {
                if (this.sock != null && !this.sock.isClosed()) {
                    LOG.error("Unexpected exception causing shutdown while sock still open", e2);
                    try {
                        this.sock.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
            }
            catch (InterruptedException e3) {
                LOG.error("Unexpected exception in LearnerHandler.", e3);
            }
            catch (SyncThrottleException e4) {
                LOG.error("too many concurrent sync.", e4);
                this.syncThrottler = null;
            }
            catch (Exception e5) {
                LOG.error("Unexpected exception in LearnerHandler.", e5);
                throw e5;
            }
        }
        finally {
            if (this.syncThrottler != null) {
                this.syncThrottler.endSync();
                this.syncThrottler = null;
            }
            String remoteAddr = this.getRemoteAddress();
            LOG.warn("******* GOODBYE {} ********", (Object)remoteAddr);
            this.messageTracker.dumpToLog(remoteAddr);
            this.shutdown();
        }
    }

    protected void startSendingPackets() {
        if (!this.sendingThreadStarted) {
            new Thread(){

                @Override
                public void run() {
                    Thread.currentThread().setName("Sender-" + LearnerHandler.this.sock.getRemoteSocketAddress());
                    try {
                        LearnerHandler.this.sendPackets();
                    }
                    catch (InterruptedException e2) {
                        LOG.warn("Unexpected interruption", e2);
                    }
                }
            }.start();
            this.sendingThreadStarted = true;
        } else {
            LOG.error("Attempting to start sending thread after it already started");
        }
    }

    protected boolean shouldSendMarkerPacketForLogging() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xFFFFFFFFL) == 0L;
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
        ZKDatabase db = learnerMaster.getZKDatabase();
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReentrantReadWriteLock.ReadLock rl = lock.readLock();
        try {
            rl.lock();
            long maxCommittedLog = db.getmaxCommittedLog();
            long minCommittedLog = db.getminCommittedLog();
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
            LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{} minCommittedLog=0x{} lastProcessedZxid=0x{} peerLastZxid=0x{}", this.getSid(), Long.toHexString(maxCommittedLog), Long.toHexString(minCommittedLog), Long.toHexString(lastProcessedZxid), Long.toHexString(peerLastZxid));
            if (db.getCommittedLog().isEmpty()) {
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }
            if (this.forceSnapSync) {
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                LOG.info("Sending DIFF zxid=0x{} for peer sid: {}", (Object)Long.toHexString(peerLastZxid), (Object)this.getSid());
                this.queueOpPacket(13, peerLastZxid);
                this.needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                LOG.debug("Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}", (Object)Long.toHexString(maxCommittedLog), (Object)this.getSid());
                this.queueOpPacket(14, maxCommittedLog);
                currentZxid = maxCommittedLog;
                this.needOpPacket = false;
                needSnap = false;
            } else if (maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid) {
                LOG.info("Using committedLog for peer sid: {}", (Object)this.getSid());
                Iterator<Leader.Proposal> itr = db.getCommittedLog().iterator();
                currentZxid = this.queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                long sizeLimit = db.calculateTxnLogSizeLimit();
                Iterator<Leader.Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: {}", (Object)this.getSid());
                    currentZxid = this.queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
                    if (currentZxid < minCommittedLog) {
                        LOG.info("Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}", (Object)Long.toHexString(currentZxid), (Object)Long.toHexString(minCommittedLog));
                        currentZxid = peerLastZxid;
                        this.queuedPackets.clear();
                        this.needOpPacket = true;
                    } else {
                        LOG.debug("Queueing committedLog 0x{}", (Object)Long.toHexString(currentZxid));
                        Iterator<Leader.Proposal> committedLogItr = db.getCommittedLog().iterator();
                        currentZxid = this.queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                        needSnap = false;
                    }
                }
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator)txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: {} maxCommittedLog=0x{} minCommittedLog=0x{} lastProcessedZxid=0x{} peerLastZxid=0x{} txnLogSyncEnabled={}", this.getSid(), Long.toHexString(maxCommittedLog), Long.toHexString(minCommittedLog), Long.toHexString(lastProcessedZxid), Long.toHexString(peerLastZxid), txnLogSyncEnabled);
            }
            if (needSnap) {
                currentZxid = db.getDataTreeLastProcessedZxid();
            }
            LOG.debug("Start forwarding 0x{} for peer sid: {}", (Object)Long.toHexString(currentZxid), (Object)this.getSid());
            this.leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
        }
        finally {
            rl.unlock();
        }
        if (this.needOpPacket && !needSnap) {
            LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot", (Object)this.getSid());
            needSnap = true;
        }
        return needSnap;
    }

    protected long queueCommittedProposals(Iterator<Leader.Proposal> itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xFFFFFFFFL) == 0L;
        long queuedZxid = peerLastZxid;
        long prevProposalZxid = -1L;
        while (itr.hasNext()) {
            Leader.Proposal propose = itr.next();
            long packetZxid = propose.packet.getZxid();
            if (maxZxid != null && packetZxid > maxZxid) break;
            if (packetZxid < peerLastZxid) {
                prevProposalZxid = packetZxid;
                continue;
            }
            if (this.needOpPacket) {
                if (packetZxid == peerLastZxid) {
                    LOG.info("Sending DIFF zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(lastCommittedZxid), (Object)this.getSid());
                    this.queueOpPacket(13, lastCommittedZxid);
                    this.needOpPacket = false;
                    continue;
                }
                if (isPeerNewEpochZxid) {
                    LOG.info("Sending DIFF zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(lastCommittedZxid), (Object)this.getSid());
                    this.queueOpPacket(13, lastCommittedZxid);
                    this.needOpPacket = false;
                } else if (packetZxid > peerLastZxid) {
                    if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                        LOG.warn("Cannot send TRUNC to peer sid: " + this.getSid() + " peer zxid is from different epoch");
                        return queuedZxid;
                    }
                    LOG.info("Sending TRUNC zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(prevProposalZxid), (Object)this.getSid());
                    this.queueOpPacket(14, prevProposalZxid);
                    this.needOpPacket = false;
                }
            }
            if (packetZxid <= queuedZxid) continue;
            this.queuePacket(propose.packet);
            this.queueOpPacket(4, packetZxid);
            queuedZxid = packetZxid;
        }
        if (this.needOpPacket && isPeerNewEpochZxid) {
            LOG.info("Sending TRUNC zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(lastCommittedZxid), (Object)this.getSid());
            this.queueOpPacket(13, lastCommittedZxid);
            this.needOpPacket = false;
        }
        return queuedZxid;
    }

    public void shutdown() {
        try {
            this.queuedPackets.clear();
            this.queuedPackets.put(this.proposalOfDeath);
        }
        catch (InterruptedException e2) {
            LOG.warn("Ignoring unexpected exception", e2);
        }
        try {
            if (this.sock != null && !this.sock.isClosed()) {
                this.sock.close();
            }
        }
        catch (IOException e3) {
            LOG.warn("Ignoring unexpected exception during socket close", e3);
        }
        this.interrupt();
        this.learnerMaster.removeLearnerHandler(this);
        this.learnerMaster.unregisterLearnerHandlerBean(this);
    }

    public long tickOfNextAckDeadline() {
        return this.tickOfNextAckDeadline;
    }

    public void ping() {
        if (!this.sendingThreadStarted) {
            return;
        }
        if (this.syncLimitCheck.check(System.nanoTime())) {
            long id = this.learnerMaster.getLastProposed();
            QuorumPacket ping = new QuorumPacket(5, id, null, null);
            this.queuePacket(ping);
        } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            this.shutdown();
        }
    }

    private void queueOpPacket(int type2, long zxid) {
        QuorumPacket packet = new QuorumPacket(type2, zxid, null, null);
        this.queuePacket(packet);
    }

    void queuePacket(QuorumPacket p) {
        this.queuedPackets.add(p);
        if (this.shouldSendMarkerPacketForLogging() && this.packetCounter.getAndIncrement() % 1000 == 0) {
            this.queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
        }
        this.queuedPacketsSize.addAndGet(LearnerHandler.packetSize(p));
    }

    static long packetSize(QuorumPacket p) {
        long size = 28L;
        byte[] data2 = p.getData();
        if (data2 != null) {
            size += (long)data2.length;
        }
        return size;
    }

    public boolean synced() {
        return this.isAlive() && (long)this.learnerMaster.getCurrentTick() <= this.tickOfNextAckDeadline;
    }

    public synchronized Map<String, Object> getLearnerHandlerInfo() {
        LinkedHashMap<String, Object> info = new LinkedHashMap<String, Object>(9);
        info.put("remote_socket_address", this.getRemoteAddress());
        info.put("sid", this.getSid());
        info.put("established", this.getEstablished());
        info.put("queued_packets", this.queuedPackets.size());
        info.put("queued_packets_size", this.queuedPacketsSize.get());
        info.put("packets_received", this.packetsReceived.longValue());
        info.put("packets_sent", this.packetsSent.longValue());
        info.put("requests", this.requestsReceived.longValue());
        info.put("last_zxid", this.getLastZxid());
        return info;
    }

    public synchronized void resetObserverConnectionStats() {
        this.packetsReceived.set(0L);
        this.packetsSent.set(0L);
        this.requestsReceived.set(0L);
        this.lastZxid = -1L;
    }

    public Queue<QuorumPacket> getQueuedPackets() {
        return this.queuedPackets;
    }

    public void setFirstPacket(boolean value) {
        this.needOpPacket = value;
    }

    private static class MarkerQuorumPacket
    extends QuorumPacket {
        long time;

        MarkerQuorumPacket(long time2) {
            this.time = time2;
        }

        @Override
        public int hashCode() {
            return Objects.hash(this.time);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MarkerQuorumPacket that = (MarkerQuorumPacket)o;
            return this.time == that.time;
        }
    }

    private class SyncLimitCheck {
        private boolean started = false;
        private long currentZxid = 0L;
        private long currentTime = 0L;
        private long nextZxid = 0L;
        private long nextTime = 0L;

        private SyncLimitCheck() {
        }

        public synchronized void start() {
            this.started = true;
        }

        public synchronized void updateProposal(long zxid, long time2) {
            if (!this.started) {
                return;
            }
            if (this.currentTime == 0L) {
                this.currentTime = time2;
                this.currentZxid = zxid;
            } else {
                this.nextTime = time2;
                this.nextZxid = zxid;
            }
        }

        public synchronized void updateAck(long zxid) {
            if (this.currentZxid == zxid) {
                this.currentTime = this.nextTime;
                this.currentZxid = this.nextZxid;
                this.nextTime = 0L;
                this.nextZxid = 0L;
            } else if (this.nextZxid == zxid) {
                LOG.warn("ACK for 0x{} received before ACK for 0x{}", (Object)Long.toHexString(zxid), (Object)Long.toHexString(this.currentZxid));
                this.nextTime = 0L;
                this.nextZxid = 0L;
            }
        }

        public synchronized boolean check(long time2) {
            if (this.currentTime == 0L) {
                return true;
            }
            long msDelay = (time2 - this.currentTime) / 1000000L;
            return msDelay < (long)LearnerHandler.this.learnerMaster.syncTimeout();
        }
    }
}

