/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.zookeeper.server.quorum;

import dlshade.org.apache.zookeeper.data.Id;
import dlshade.org.apache.zookeeper.jmx.MBeanRegistry;
import dlshade.org.apache.zookeeper.server.Request;
import dlshade.org.apache.zookeeper.server.ZKDatabase;
import dlshade.org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import dlshade.org.apache.zookeeper.server.quorum.Leader;
import dlshade.org.apache.zookeeper.server.quorum.Learner;
import dlshade.org.apache.zookeeper.server.quorum.LearnerHandler;
import dlshade.org.apache.zookeeper.server.quorum.LearnerHandlerBean;
import dlshade.org.apache.zookeeper.server.quorum.LearnerMaster;
import dlshade.org.apache.zookeeper.server.quorum.QuorumPacket;
import dlshade.org.apache.zookeeper.server.quorum.QuorumPeer;
import dlshade.org.apache.zookeeper.server.quorum.StateSummary;
import dlshade.org.apache.zookeeper.server.quorum.UnifiedServerSocket;
import dlshade.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ObserverMaster
extends LearnerMaster
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class);
    private final AtomicLong followerCounter = new AtomicLong(-1L);
    private QuorumPeer self;
    private FollowerZooKeeperServer zks;
    private int port;
    private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap();
    private static final int PKTS_SIZE_LIMIT = 0x2000000;
    private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", 0x2000000);
    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue();
    private int pktsSize = 0;
    private long lastProposedZxid;
    private final Object revalidateSessionLock = new Object();
    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue();
    private Thread thread;
    private ServerSocket ss;
    private boolean listenerRunning;
    private ScheduledExecutorService pinger;
    Runnable ping = new Runnable(){

        @Override
        public void run() {
            for (LearnerHandler lh : ObserverMaster.this.activeObservers) {
                lh.ping();
            }
        }
    };

    ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) {
        this.self = self;
        this.zks = zks;
        this.port = port;
    }

    @Override
    public void addLearnerHandler(LearnerHandler learnerHandler) {
        if (!this.listenerRunning) {
            throw new RuntimeException("ObserverMaster is not running");
        }
    }

    @Override
    public void removeLearnerHandler(LearnerHandler learnerHandler) {
        this.activeObservers.remove(learnerHandler);
    }

    @Override
    public int syncTimeout() {
        return this.self.getSyncLimit() * this.self.getTickTime();
    }

    @Override
    public int getTickOfNextAckDeadline() {
        return this.self.tick.get() + this.self.syncLimit;
    }

    @Override
    public int getTickOfInitialAckDeadline() {
        return this.self.tick.get() + this.self.initLimit + this.self.syncLimit;
    }

    @Override
    public long getAndDecrementFollowerCounter() {
        return this.followerCounter.getAndDecrement();
    }

    @Override
    public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException {
    }

    @Override
    public void waitForStartup() throws InterruptedException {
    }

    @Override
    public synchronized long getLastProposed() {
        return this.lastProposedZxid;
    }

    @Override
    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        return this.self.getCurrentEpoch();
    }

    @Override
    public ZKDatabase getZKDatabase() {
        return this.zks.getZKDatabase();
    }

    @Override
    public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException {
    }

    @Override
    public int getCurrentTick() {
        return this.self.tick.get();
    }

    @Override
    public void processAck(long sid, long zxid, SocketAddress localSocketAddress) {
        if ((zxid & 0xFFFFFFFFL) == 0L) {
            return;
        }
        throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid));
    }

    @Override
    public void touch(long sess, int to) {
        this.zks.getSessionTracker().touchSession(sess, to);
    }

    boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
        DataInputStream dis = new DataInputStream(bis);
        long id = dis.readLong();
        boolean valid = dis.readBoolean();
        Iterator<Revalidation> itr = this.pendingRevalidations.iterator();
        if (!itr.hasNext()) {
            return false;
        }
        Revalidation revalidation = itr.next();
        if (revalidation.sessionId != id) {
            return false;
        }
        itr.remove();
        LearnerHandler learnerHandler = revalidation.handler;
        QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(), Arrays.copyOf(qp.getData(), qp.getData().length), (List<Id>)(qp.getAuthinfo() == null ? null : new ArrayList<Id>(qp.getAuthinfo())));
        learnerHandler.queuePacket(deepCopy);
        if (valid) {
            this.touch(revalidation.sessionId, revalidation.timeout);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
        DataInputStream dis = new DataInputStream(bis);
        long id = dis.readLong();
        int to = dis.readInt();
        Object object = this.revalidateSessionLock;
        synchronized (object) {
            this.pendingRevalidations.add(new Revalidation(id, to, learnerHandler));
            Learner learner = this.zks.getLearner();
            if (learner != null) {
                learner.writePacket(qp, true);
            }
        }
    }

    @Override
    public void submitLearnerRequest(Request si) {
        this.zks.processObserverRequest(si);
    }

    @Override
    public synchronized long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid) {
        Iterator<QuorumPacket> itr = this.committedPkts.iterator();
        if (itr.hasNext()) {
            QuorumPacket packet = itr.next();
            if (packet.getZxid() > lastSeenZxid + 1L) {
                LOG.error("LearnerHandler is too far behind (0x{} < 0x{}), disconnecting {} at {}", new Object[]{Long.toHexString(lastSeenZxid + 1L), Long.toHexString(packet.getZxid()), learnerHandler.getSid(), learnerHandler.getRemoteAddress()});
                learnerHandler.shutdown();
                return -1L;
            }
            if (packet.getZxid() == lastSeenZxid + 1L) {
                learnerHandler.queuePacket(packet);
            }
            long queueHeadZxid = packet.getZxid();
            long queueBytesUsed = LearnerHandler.packetSize(packet);
            while (itr.hasNext()) {
                packet = itr.next();
                if (packet.getZxid() <= lastSeenZxid) continue;
                learnerHandler.queuePacket(packet);
                queueBytesUsed += LearnerHandler.packetSize(packet);
            }
            LOG.info("finished syncing observer from retained commit queue: sid {}, queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, num bytes used {}", new Object[]{learnerHandler.getSid(), Long.toHexString(queueHeadZxid), Long.toHexString(packet.getZxid()), Long.toHexString(lastSeenZxid), packet.getZxid() - lastSeenZxid, queueBytesUsed});
        }
        this.activeObservers.add(learnerHandler);
        return this.lastProposedZxid;
    }

    @Override
    public long getQuorumVerifierVersion() {
        return this.self.getQuorumVerifier().getVersion();
    }

    @Override
    public String getPeerInfo(long sid) {
        QuorumPeer.QuorumServer server = this.self.getView().get(sid);
        return server == null ? "" : server.toString();
    }

    @Override
    public byte[] getQuorumVerifierBytes() {
        return this.self.getLastSeenQuorumVerifier().toString().getBytes();
    }

    @Override
    public QuorumAuthServer getQuorumAuthServer() {
        return this.self == null ? null : this.self.authServer;
    }

    void proposalReceived(QuorumPacket qp) {
        this.proposedPkts.add(new QuorumPacket(8, qp.getZxid(), qp.getData(), null));
    }

    private synchronized QuorumPacket removeProposedPacket(long zxid) {
        QuorumPacket pkt = this.proposedPkts.peek();
        if (pkt == null || pkt.getZxid() > zxid) {
            LOG.debug("ignore missing proposal packet for {}", (Object)Long.toHexString(zxid));
            return null;
        }
        if (pkt.getZxid() != zxid) {
            String m = String.format("Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", zxid, pkt.getZxid());
            LOG.error(m);
            throw new RuntimeException(m);
        }
        this.proposedPkts.remove();
        return pkt;
    }

    private synchronized void cacheCommittedPacket(QuorumPacket pkt) {
        this.committedPkts.add(pkt);
        this.pktsSize = (int)((long)this.pktsSize + LearnerHandler.packetSize(pkt));
        for (int i = 0; (double)this.pktsSize > (double)pktsSizeLimit * 0.8 && i < 5; ++i) {
            QuorumPacket oldPkt = this.committedPkts.poll();
            if (oldPkt == null) {
                this.pktsSize = 0;
                break;
            }
            this.pktsSize = (int)((long)this.pktsSize - LearnerHandler.packetSize(oldPkt));
        }
        while (this.pktsSize > pktsSizeLimit) {
            QuorumPacket oldPkt = this.committedPkts.poll();
            if (oldPkt == null) {
                this.pktsSize = 0;
                break;
            }
            this.pktsSize = (int)((long)this.pktsSize - LearnerHandler.packetSize(oldPkt));
        }
    }

    private synchronized void sendPacket(QuorumPacket pkt) {
        for (LearnerHandler lh : this.activeObservers) {
            lh.queuePacket(pkt);
        }
        this.lastProposedZxid = pkt.getZxid();
    }

    synchronized void proposalCommitted(long zxid) {
        QuorumPacket pkt = this.removeProposedPacket(zxid);
        if (pkt == null) {
            return;
        }
        this.cacheCommittedPacket(pkt);
        this.sendPacket(pkt);
    }

    synchronized void informAndActivate(long zxid, long suggestedLeaderId) {
        QuorumPacket pkt = this.removeProposedPacket(zxid);
        if (pkt == null) {
            return;
        }
        QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket(zxid, suggestedLeaderId, pkt.getData());
        this.cacheCommittedPacket(informAndActivateQP);
        this.sendPacket(informAndActivateQP);
    }

    public synchronized void start() throws IOException {
        if (this.thread != null && this.thread.isAlive()) {
            return;
        }
        this.listenerRunning = true;
        int backlog = 10;
        InetAddress address = this.self.getQuorumAddress().getReachableOrOne().getAddress();
        if (this.self.shouldUsePortUnification() || this.self.isSslQuorum()) {
            boolean allowInsecureConnection = this.self.shouldUsePortUnification();
            this.ss = this.self.getQuorumListenOnAllIPs() ? new UnifiedServerSocket(this.self.getX509Util(), allowInsecureConnection, this.port, backlog) : new UnifiedServerSocket(this.self.getX509Util(), allowInsecureConnection, this.port, backlog, address);
        } else {
            this.ss = this.self.getQuorumListenOnAllIPs() ? new ServerSocket(this.port, backlog) : new ServerSocket(this.port, backlog, address);
        }
        this.thread = new Thread((Runnable)this, "ObserverMaster");
        this.thread.start();
        this.pinger = Executors.newSingleThreadScheduledExecutor();
        this.pinger.scheduleAtFixedRate(this.ping, this.self.tickTime / 2, this.self.tickTime / 2, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        ServerSocket ss;
        ObserverMaster observerMaster = this;
        synchronized (observerMaster) {
            ss = this.ss;
        }
        while (this.listenerRunning) {
            try {
                Socket s = ss.accept();
                s.setSoTimeout(this.self.tickTime * this.self.initLimit);
                BufferedInputStream is = new BufferedInputStream(s.getInputStream());
                LearnerHandler lh = new LearnerHandler(s, is, this);
                lh.start();
            }
            catch (Exception e) {
                if (this.listenerRunning) {
                    LOG.debug("Ignoring accept exception (maybe shutting down)", (Throwable)e);
                    continue;
                }
                LOG.debug("Ignoring accept exception (maybe client closed)", (Throwable)e);
            }
        }
    }

    public synchronized void stop() {
        this.listenerRunning = false;
        if (this.pinger != null) {
            this.pinger.shutdownNow();
        }
        if (this.ss != null) {
            try {
                this.ss.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
        for (LearnerHandler lh : this.activeObservers) {
            lh.shutdown();
        }
    }

    int getNumActiveObservers() {
        return this.activeObservers.size();
    }

    public Iterable<Map<String, Object>> getActiveObservers() {
        HashSet<Map<String, Object>> info = new HashSet<Map<String, Object>>();
        for (LearnerHandler lh : this.activeObservers) {
            info.add(lh.getLearnerHandlerInfo());
        }
        return info;
    }

    public void resetObserverConnectionStats() {
        for (LearnerHandler lh : this.activeObservers) {
            lh.resetObserverConnectionStats();
        }
    }

    int getPktsSizeLimit() {
        return pktsSizeLimit;
    }

    static void setPktsSizeLimit(int sizeLimit) {
        pktsSizeLimit = sizeLimit;
    }

    @Override
    public void registerLearnerHandlerBean(LearnerHandler learnerHandler, Socket socket) {
        LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, socket);
        if (this.zks.registerJMX(bean)) {
            this.connectionBeans.put(learnerHandler, bean);
        }
    }

    @Override
    public void unregisterLearnerHandlerBean(LearnerHandler learnerHandler) {
        LearnerHandlerBean bean = this.connectionBeans.remove(learnerHandler);
        if (bean != null) {
            MBeanRegistry.getInstance().unregister(bean);
        }
    }

    static class Revalidation {
        public final long sessionId;
        public final int timeout;
        public final LearnerHandler handler;

        Revalidation(Long sessionId, int timeout, LearnerHandler handler) {
            this.sessionId = sessionId;
            this.timeout = timeout;
            this.handler = handler;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Revalidation that = (Revalidation)o;
            return this.sessionId == that.sessionId && this.timeout == that.timeout && this.handler.equals(that.handler);
        }

        public int hashCode() {
            int result = (int)(this.sessionId ^ this.sessionId >>> 32);
            result = 31 * result + this.timeout;
            result = 31 * result + this.handler.hashCode();
            return result;
        }
    }
}

