/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.BookKeeperClusterTestCase;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class BookieAutoRecoveryTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(BookieAutoRecoveryTest.class);
    private static final byte[] PASSWD = "admin".getBytes();
    private static final byte[] data = "TESTDATA".getBytes();
    private static final String openLedgerRereplicationGracePeriod = "3000";
    private BookKeeper.DigestType digestType;
    private MetadataClientDriver metadataClientDriver;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private LedgerManager ledgerManager;
    private OrderedScheduler scheduler;
    private final String underreplicatedPath = "/ledgers/underreplication/ledgers";

    public BookieAutoRecoveryTest() throws Exception {
        super(3);
        this.baseConf.setLedgerManagerFactoryClassName("org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory");
        this.baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
        this.baseConf.setRwRereplicateBackoffMs(500);
        this.baseClientConf.setLedgerManagerFactoryClassName("org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory");
        this.digestType = BookKeeper.DigestType.MAC;
        this.setAutoRecoveryEnabled(true);
        Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver");
        Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
    }

    @Override
    @BeforeMethod
    public void setUp() throws Exception {
        super.setUp();
        this.baseConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
        this.baseClientConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.metadataClientDriver = MetadataDrivers.getClientDriver((URI)URI.create(this.baseClientConf.getMetadataServiceUri()));
        this.metadataClientDriver.initialize(this.baseClientConf, (ScheduledExecutorService)this.scheduler, (StatsLogger)NullStatsLogger.INSTANCE, Optional.empty());
        this.mFactory = this.metadataClientDriver.getLedgerManagerFactory();
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
        this.ledgerManager = this.mFactory.newLedgerManager();
    }

    @Override
    @AfterMethod
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
        if (null != this.ledgerManager) {
            this.ledgerManager.close();
            this.ledgerManager = null;
        }
        if (null != this.metadataClientDriver) {
            this.metadataClientDriver.close();
            this.metadataClientDriver = null;
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    @Test
    public void testOpenLedgers() throws Exception {
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(1, 5);
        LedgerHandle lh = listOfLedgerHandle.get(0);
        int ledgerReplicaIndex = 0;
        BookieId replicaToKillAddr = (BookieId)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        String urLedgerZNode = this.getUrLedgerZNode(lh);
        ledgerReplicaIndex = this.getReplicaIndexInLedger(lh, replicaToKillAddr);
        CountDownLatch latch = new CountDownLatch(1);
        AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        LOG.info("Killing Bookie :" + replicaToKillAddr);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(1);
        LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process");
        AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        this.startNewBookie();
        int newBookieIndex = this.lastBookieIndex();
        BookieServer newBookieServer = this.serverByIndex(newBookieIndex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr);
        }
        latch.await();
        LOG.info("Waiting to update the urledger metadata in zookeeper");
        this.verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex);
    }

    @Test
    public void testClosedLedgers() throws Exception {
        ArrayList<Integer> listOfReplicaIndex = new ArrayList<Integer>();
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(1, 5);
        this.closeLedgers(listOfLedgerHandle);
        LedgerHandle lhandle = listOfLedgerHandle.get(0);
        int ledgerReplicaIndex = 0;
        BookieId replicaToKillAddr = (BookieId)((List)lhandle.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
        for (LedgerHandle lh : listOfLedgerHandle) {
            ledgerReplicaIndex = this.getReplicaIndexInLedger(lh, replicaToKillAddr);
            listOfReplicaIndex.add(ledgerReplicaIndex);
            AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(this.getUrLedgerZNode(lh), latch));
        }
        LOG.info("Killing Bookie :" + replicaToKillAddr);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(listOfLedgerHandle.size());
        for (LedgerHandle lh : listOfLedgerHandle) {
            String urLedgerZNode = this.getUrLedgerZNode(lh);
            LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process");
            AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        }
        this.startNewBookie();
        int newBookieIndex = this.lastBookieIndex();
        BookieServer newBookieServer = this.serverByIndex(newBookieIndex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr);
        }
        latch.await();
        LOG.info("Waiting to update the urledger metadata in zookeeper");
        for (int index = 0; index < listOfLedgerHandle.size(); ++index) {
            this.verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(index), (Integer)listOfReplicaIndex.get(index));
        }
    }

    @Test
    public void testStopWhileReplicationInProgress() throws Exception {
        int numberOfLedgers = 2;
        ArrayList<Integer> listOfReplicaIndex = new ArrayList<Integer>();
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(numberOfLedgers, 5);
        this.closeLedgers(listOfLedgerHandle);
        LedgerHandle handle = listOfLedgerHandle.get(0);
        BookieId replicaToKillAddr = (BookieId)((List)handle.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie:" + replicaToKillAddr);
        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
        for (int i = 0; i < listOfLedgerHandle.size(); ++i) {
            String urLedgerZNode = this.getUrLedgerZNode(listOfLedgerHandle.get(i));
            AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
            int replicaIndexInLedger = this.getReplicaIndexInLedger(listOfLedgerHandle.get(i), replicaToKillAddr);
            listOfReplicaIndex.add(replicaIndexInLedger);
        }
        LOG.info("Killing Bookie :" + replicaToKillAddr);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(listOfLedgerHandle.size());
        for (LedgerHandle lh : listOfLedgerHandle) {
            String urLedgerZNode = this.getUrLedgerZNode(lh);
            LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process");
            AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        }
        this.startNewBookie();
        int newBookieIndex = this.lastBookieIndex();
        BookieServer newBookieServer = this.serverByIndex(newBookieIndex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr);
        }
        while (true) {
            if (latch.getCount() < (long)numberOfLedgers || latch.getCount() <= 0L) break;
            Thread.sleep(1000L);
        }
        this.stopReplicationService();
        LOG.info("Latch Count is:" + latch.getCount());
        this.startReplicationService();
        LOG.info("Waiting to finish rereplication processes");
        latch.await();
        LOG.info("Waiting to update the urledger metadata in zookeeper");
        for (int index = 0; index < listOfLedgerHandle.size(); ++index) {
            this.verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(index), (Integer)listOfReplicaIndex.get(index));
        }
    }

    @Test
    public void testNoSuchLedgerExists() throws Exception {
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(2, 5);
        CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
        for (LedgerHandle lh : listOfLedgerHandle) {
            AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(this.getUrLedgerZNode(lh), latch));
        }
        BookieId replicaToKillAddr = (BookieId)((List)listOfLedgerHandle.get(0).getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        this.killBookie(replicaToKillAddr);
        replicaToKillAddr = (BookieId)((List)listOfLedgerHandle.get(0).getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(listOfLedgerHandle.size());
        for (LedgerHandle lh : listOfLedgerHandle) {
            AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(this.getUrLedgerZNode(lh), latch));
        }
        for (LedgerHandle lh : listOfLedgerHandle) {
            this.bkc.deleteLedger(lh.getId());
        }
        this.startNewBookie();
        latch.await();
        for (LedgerHandle lh : listOfLedgerHandle) {
            AssertJUnit.assertNull((String)"UrLedger still exists after rereplication", (Object)this.watchUrLedgerNode(this.getUrLedgerZNode(lh), latch));
        }
    }

    @Test
    public void testEmptyLedgerLosesQuorumEventually() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, PASSWD);
        CountDownLatch latch = new CountDownLatch(1);
        String urZNode = this.getUrLedgerZNode(lh);
        this.watchUrLedgerNode(urZNode, latch);
        BookieId replicaToKill = (BookieId)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(2);
        LOG.info("Killing last bookie, {}, in ensemble {}", (Object)replicaToKill, lh.getLedgerMetadata().getAllEnsembles().get(0L));
        this.killBookie(replicaToKill);
        this.startNewBookie();
        this.getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get();
        AssertJUnit.assertTrue((String)"Should be marked as underreplicated", (boolean)latch.await(5L, TimeUnit.SECONDS));
        latch = new CountDownLatch(1);
        Stat s = this.watchUrLedgerNode(urZNode, latch);
        if (s != null) {
            AssertJUnit.assertTrue((String)"Should be marked as replicated", (boolean)latch.await(15L, TimeUnit.SECONDS));
        }
        replicaToKill = (BookieId)((List)lh.getLedgerMetadata().getAllEnsembles().get(0L)).get(1);
        LOG.info("Killing second bookie, {}, in ensemble {}", (Object)replicaToKill, lh.getLedgerMetadata().getAllEnsembles().get(0L));
        this.killBookie(replicaToKill);
        this.getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get();
        AssertJUnit.assertTrue((String)"Should be marked as underreplicated", (boolean)latch.await(5L, TimeUnit.SECONDS));
        latch = new CountDownLatch(1);
        s = this.watchUrLedgerNode(urZNode, latch);
        this.startNewBookie();
        this.getAuditor(10, TimeUnit.SECONDS).submitAuditTask().get();
        if (s != null) {
            AssertJUnit.assertTrue((String)"Should be marked as replicated", (boolean)latch.await(20L, TimeUnit.SECONDS));
        }
        this.bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, PASSWD);
    }

    @Test
    public void testLedgerMetadataContainsIpAddressAsBookieID() throws Exception {
        this.stopBKCluster();
        this.bkc = new BookKeeperTestClient(this.baseClientConf);
        ServerConfiguration serverConf1 = this.newServerConfiguration();
        ServerConfiguration serverConf2 = this.newServerConfiguration();
        serverConf2.setUseHostNameAsBookieID(true);
        ServerConfiguration serverConf3 = this.newServerConfiguration();
        serverConf3.setUseHostNameAsBookieID(true);
        this.startAndAddBookie(serverConf1);
        this.startAndAddBookie(serverConf2);
        this.startAndAddBookie(serverConf3);
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(1, 5);
        LedgerHandle lh = listOfLedgerHandle.get(0);
        int ledgerReplicaIndex = 0;
        NavigableMap ensembles = lh.getLedgerMetadata().getAllEnsembles();
        List bkAddresses = (List)ensembles.get(0L);
        BookieId replicaToKillAddr = (BookieId)bkAddresses.get(0);
        for (BookieId bookieSocketAddress : bkAddresses) {
            if (this.isCreatedFromIp(bookieSocketAddress)) continue;
            replicaToKillAddr = bookieSocketAddress;
            LOG.info("Kill bookie which has registered using hostname");
            break;
        }
        String urLedgerZNode = this.getUrLedgerZNode(lh);
        ledgerReplicaIndex = this.getReplicaIndexInLedger(lh, replicaToKillAddr);
        CountDownLatch latch = new CountDownLatch(1);
        AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        LOG.info("Killing Bookie :" + replicaToKillAddr);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(1);
        LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process");
        AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        ServerConfiguration serverConf = this.newServerConfiguration();
        serverConf.setUseHostNameAsBookieID(false);
        this.startAndAddBookie(serverConf);
        int newBookieIndex = this.lastBookieIndex();
        BookieServer newBookieServer = this.serverByIndex(newBookieIndex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr);
        }
        latch.await();
        LOG.info("Waiting to update the urledger metadata in zookeeper");
        this.verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex);
    }

    @Test
    public void testLedgerMetadataContainsHostNameAsBookieID() throws Exception {
        this.stopBKCluster();
        this.bkc = new BookKeeperTestClient(this.baseClientConf);
        ServerConfiguration serverConf1 = this.newServerConfiguration();
        ServerConfiguration serverConf2 = this.newServerConfiguration();
        serverConf2.setUseHostNameAsBookieID(true);
        ServerConfiguration serverConf3 = this.newServerConfiguration();
        serverConf3.setUseHostNameAsBookieID(true);
        this.startAndAddBookie(serverConf1);
        this.startAndAddBookie(serverConf2);
        this.startAndAddBookie(serverConf3);
        List<LedgerHandle> listOfLedgerHandle = this.createLedgersAndAddEntries(1, 5);
        LedgerHandle lh = listOfLedgerHandle.get(0);
        int ledgerReplicaIndex = 0;
        NavigableMap ensembles = lh.getLedgerMetadata().getAllEnsembles();
        List bkAddresses = (List)ensembles.get(0L);
        BookieId replicaToKillAddr = (BookieId)bkAddresses.get(0);
        for (BookieId bookieSocketAddress : bkAddresses) {
            if (!this.isCreatedFromIp(bookieSocketAddress)) continue;
            replicaToKillAddr = bookieSocketAddress;
            LOG.info("Kill bookie which has registered using ipaddress");
            break;
        }
        String urLedgerZNode = this.getUrLedgerZNode(lh);
        ledgerReplicaIndex = this.getReplicaIndexInLedger(lh, replicaToKillAddr);
        CountDownLatch latch = new CountDownLatch(1);
        AssertJUnit.assertNull((String)"UrLedger already exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        LOG.info("Killing Bookie :" + replicaToKillAddr);
        this.killBookie(replicaToKillAddr);
        latch.await();
        latch = new CountDownLatch(1);
        LOG.info("Watching on urLedgerPath:" + urLedgerZNode + " to know the status of rereplication process");
        AssertJUnit.assertNotNull((String)"UrLedger doesn't exists!", (Object)this.watchUrLedgerNode(urLedgerZNode, latch));
        this.bkc = new BookKeeperTestClient(this.baseClientConf);
        ServerConfiguration serverConf = this.newServerConfiguration();
        serverConf.setUseHostNameAsBookieID(true);
        this.startAndAddBookie(serverConf);
        int newBookieIndex = this.lastBookieIndex();
        BookieServer newBookieServer = this.serverByIndex(newBookieIndex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting to finish the replication of failed bookie : " + replicaToKillAddr);
        }
        latch.await();
        LOG.info("Waiting to update the urledger metadata in zookeeper");
        this.verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex);
    }

    private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) {
        NavigableMap ensembles = lh.getLedgerMetadata().getAllEnsembles();
        int ledgerReplicaIndex = -1;
        for (BookieId addr : (List)ensembles.get(0L)) {
            ++ledgerReplicaIndex;
            if (!addr.equals((Object)replicaToKill)) continue;
            break;
        }
        return ledgerReplicaIndex;
    }

    private void verifyLedgerEnsembleMetadataAfterReplication(BookieServer newBookieServer, LedgerHandle lh, int ledgerReplicaIndex) throws Exception {
        LedgerHandle openLedger = this.bkc.openLedger(lh.getId(), this.digestType, PASSWD);
        BookieId inetSocketAddress = (BookieId)((List)openLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(ledgerReplicaIndex);
        AssertJUnit.assertEquals((String)("Rereplication has been failed and ledgerReplicaIndex :" + ledgerReplicaIndex), (Object)newBookieServer.getBookieId(), (Object)inetSocketAddress);
        openLedger.close();
    }

    private void closeLedgers(List<LedgerHandle> listOfLedgerHandle) throws InterruptedException, BKException {
        for (LedgerHandle lh : listOfLedgerHandle) {
            lh.close();
        }
    }

    private List<LedgerHandle> createLedgersAndAddEntries(int numberOfLedgers, int numberOfEntries) throws InterruptedException, BKException {
        ArrayList<LedgerHandle> listOfLedgerHandle = new ArrayList<LedgerHandle>(numberOfLedgers);
        for (int index = 0; index < numberOfLedgers; ++index) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, this.digestType, PASSWD);
            listOfLedgerHandle.add(lh);
            for (int i = 0; i < numberOfEntries; ++i) {
                lh.addEntry(data);
            }
        }
        return listOfLedgerHandle;
    }

    private String getUrLedgerZNode(LedgerHandle lh) {
        return ZkLedgerUnderreplicationManager.getUrLedgerZnode((String)"/ledgers/underreplication/ledgers", (long)lh.getId());
    }

    private Stat watchUrLedgerNode(String znode, final CountDownLatch latch) throws KeeperException, InterruptedException {
        return this.zkc.exists(znode, new Watcher(){

            public void process(WatchedEvent event) {
                if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                    LOG.info("Received Ledger rereplication completion event :" + event.getType());
                    latch.countDown();
                }
                if (event.getType() == Watcher.Event.EventType.NodeCreated) {
                    LOG.info("Received urLedger publishing event :" + event.getType());
                    latch.countDown();
                }
            }
        });
    }
}

