/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.FaultInjectableZKRegistrationManager;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationManager;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarRegistrationClientTest
extends BaseMetadataStoreTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarRegistrationClientTest.class);

    private static Set<BookieId> prepareNBookies(int num) {
        HashSet<BookieId> bookies = new HashSet<BookieId>();
        for (int i = 0; i < num; ++i) {
            bookies.add(new BookieSocketAddress("127.0.0.1", 3181 + i).toBookieId());
        }
        return bookies;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testGetWritableBookies(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().fsyncEnable(false).build());
        try {
            String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
            PulsarRegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, (AbstractConfiguration)Mockito.mock(AbstractConfiguration.class));
            try {
                PulsarRegistrationClient rc = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    Set<BookieId> addresses = PulsarRegistrationClientTest.prepareNBookies(10);
                    ArrayList<String> children = new ArrayList<String>();
                    for (BookieId address : addresses) {
                        children.add(address.toString());
                        rm.registerBookie(address, false, new BookieServiceInfo());
                    }
                    Versioned result = (Versioned)FutureUtils.result((CompletableFuture)rc.getWritableBookies());
                    Assert.assertEquals((int)((Set)result.getValue()).size(), (int)addresses.size());
                }
                finally {
                    if (Collections.singletonList(rc).get(0) != null) {
                        rc.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(rm).get(0) != null) {
                    rm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testGetReadonlyBookies(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().fsyncEnable(false).build());
        try {
            String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
            PulsarRegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, (AbstractConfiguration)Mockito.mock(AbstractConfiguration.class));
            try {
                PulsarRegistrationClient rc = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    Set<BookieId> addresses = PulsarRegistrationClientTest.prepareNBookies(10);
                    ArrayList<String> children = new ArrayList<String>();
                    for (BookieId address : addresses) {
                        children.add(address.toString());
                        rm.registerBookie(address, true, new BookieServiceInfo());
                    }
                    Versioned result = (Versioned)FutureUtils.result((CompletableFuture)rc.getReadOnlyBookies());
                    Assert.assertEquals((int)((Set)result.getValue()).size(), (int)addresses.size());
                }
                finally {
                    if (Collections.singletonList(rc).get(0) != null) {
                        rc.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(rm).get(0) != null) {
                    rm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().fsyncEnable(false).build());
        try {
            String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
            PulsarRegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, (AbstractConfiguration)Mockito.mock(AbstractConfiguration.class));
            try {
                PulsarRegistrationClient rc = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    boolean readOnly;
                    BookieServiceInfo.Endpoint endpoint;
                    BookieServiceInfo info;
                    ArrayList<BookieId> addresses = new ArrayList<BookieId>();
                    for (int i = 0; i < 10; ++i) {
                        addresses.add(BookieId.parse((String)("BOOKIE-" + i)));
                    }
                    HashMap<BookieId, BookieServiceInfo> bookieServiceInfos = new HashMap<BookieId, BookieServiceInfo>();
                    HashSet<BookieId> readOnlyBookies = new HashSet<BookieId>();
                    int port = 223;
                    for (BookieId address : addresses) {
                        info = new BookieServiceInfo();
                        endpoint = new BookieServiceInfo.Endpoint();
                        endpoint.setAuth(Collections.emptyList());
                        endpoint.setExtensions(Collections.emptyList());
                        endpoint.setId("id");
                        endpoint.setHost("localhost");
                        endpoint.setPort(port++);
                        endpoint.setProtocol("bookie-rpc");
                        info.setEndpoints(Arrays.asList(endpoint));
                        bookieServiceInfos.put(address, info);
                        boolean bl = readOnly = port % 2 == 0;
                        if (readOnly) {
                            readOnlyBookies.add(address);
                        }
                        rm.registerBookie(address, readOnly, info);
                        rm.writeCookie(address, new Versioned((Object)new byte[0], Version.NEW));
                    }
                    PulsarRegistrationClientTest.getAndVerifyAllBookies((RegistrationClient)rc, addresses);
                    Awaitility.await().untilAsserted(() -> this.lambda$testGetBookieServiceInfo$0(addresses, (RegistrationClient)rc, bookieServiceInfos));
                    for (BookieId address : addresses) {
                        rm.unregisterBookie(address, readOnlyBookies.contains(address));
                        readOnlyBookies.remove(address);
                    }
                    PulsarRegistrationClientTest.getAndVerifyAllBookies((RegistrationClient)rc, addresses);
                    Awaitility.await().untilAsserted(() -> PulsarRegistrationClientTest.lambda$testGetBookieServiceInfo$2(addresses, (RegistrationClient)rc));
                    for (BookieId address : addresses) {
                        rm.registerBookie(address, false, (BookieServiceInfo)bookieServiceInfos.get(address));
                    }
                    PulsarRegistrationClientTest.getAndVerifyAllBookies((RegistrationClient)rc, addresses);
                    Awaitility.await().ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException).untilAsserted(() -> this.lambda$testGetBookieServiceInfo$4(addresses, (RegistrationClient)rc, bookieServiceInfos));
                    port = 111;
                    for (BookieId address : addresses) {
                        info = new BookieServiceInfo();
                        endpoint = new BookieServiceInfo.Endpoint();
                        endpoint.setAuth(Collections.emptyList());
                        endpoint.setExtensions(Collections.emptyList());
                        endpoint.setId("id");
                        endpoint.setHost("localhost");
                        endpoint.setPort(port++);
                        endpoint.setProtocol("bookie-rpc");
                        info.setEndpoints(Arrays.asList(endpoint));
                        bookieServiceInfos.put(address, info);
                        readOnly = port % 2 == 0;
                        rm.unregisterBookie(address, readOnlyBookies.contains(address));
                        rm.registerBookie(address, readOnly, info);
                        if (!readOnly) continue;
                        readOnlyBookies.add(address);
                    }
                    Awaitility.await().ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException).untilAsserted(() -> this.lambda$testGetBookieServiceInfo$6(addresses, (RegistrationClient)rc, bookieServiceInfos));
                }
                finally {
                    if (Collections.singletonList(rc).get(0) != null) {
                        rc.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(rm).get(0) != null) {
                    rm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    private static void getAndVerifyAllBookies(RegistrationClient rc, List<BookieId> addresses) throws InterruptedException, ExecutionException {
        Set all = (Set)((Versioned)rc.getAllBookies().get()).getValue();
        Assert.assertEquals((int)all.size(), (int)addresses.size());
        for (BookieId id : all) {
            Assert.assertTrue((boolean)addresses.contains(id));
        }
        for (BookieId id : addresses) {
            Assert.assertTrue((boolean)all.contains(id));
        }
    }

    private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
        Assert.assertEquals((Map)a.getProperties(), (Map)b.getProperties());
        Assert.assertEquals((int)a.getEndpoints().size(), (int)b.getEndpoints().size());
        for (int i = 0; i < a.getEndpoints().size(); ++i) {
            BookieServiceInfo.Endpoint e1 = (BookieServiceInfo.Endpoint)a.getEndpoints().get(i);
            BookieServiceInfo.Endpoint e2 = (BookieServiceInfo.Endpoint)b.getEndpoints().get(i);
            Assert.assertEquals((String)e1.getHost(), (String)e2.getHost());
            Assert.assertEquals((int)e1.getPort(), (int)e2.getPort());
            Assert.assertEquals((String)e1.getId(), (String)e2.getId());
            Assert.assertEquals((String)e1.getProtocol(), (String)e2.getProtocol());
            Assert.assertEquals((Collection)e1.getExtensions(), (Collection)e2.getExtensions());
            Assert.assertEquals((Collection)e1.getAuth(), (Collection)e2.getAuth());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
            PulsarRegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, (AbstractConfiguration)Mockito.mock(AbstractConfiguration.class));
            try {
                PulsarRegistrationClient rc = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    Set<BookieId> addresses = PulsarRegistrationClientTest.prepareNBookies(10);
                    ArrayList<String> children = new ArrayList<String>();
                    for (BookieId address : addresses) {
                        children.add(address.toString());
                        rm.writeCookie(address, new Versioned((Object)new byte[0], Version.NEW));
                    }
                    Versioned result = (Versioned)FutureUtils.result((CompletableFuture)rc.getAllBookies());
                    Assert.assertEquals((int)((Set)result.getValue()).size(), (int)addresses.size());
                }
                finally {
                    if (Collections.singletonList(rc).get(0) != null) {
                        rc.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(rm).get(0) != null) {
                    rm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    @Test(dataProvider="impl")
    public void testWatchWritableBookiesSuccess(String provider, Supplier<String> urlSupplier) throws Exception {
        this.testWatchBookiesSuccess(provider, urlSupplier, true);
    }

    @Test(dataProvider="impl")
    public void testWatchReadonlyBookiesSuccess(String provider, Supplier<String> urlSupplier) throws Exception {
        this.testWatchBookiesSuccess(provider, urlSupplier, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWatchBookiesSuccess(String provider, Supplier<String> urlSupplier, boolean isWritable) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().fsyncEnable(false).build());
        try {
            String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
            PulsarRegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, (AbstractConfiguration)Mockito.mock(AbstractConfiguration.class));
            try {
                PulsarRegistrationClient rc = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    ConcurrentLinkedQueue updates = new ConcurrentLinkedQueue();
                    ConcurrentHashMap bookies = new ConcurrentHashMap();
                    RegistrationClient.RegistrationListener listener = b -> {
                        updates.add(b);
                        ((Set)b.getValue()).forEach(x -> bookies.put(x, true));
                    };
                    int BOOKIES = 10;
                    Set<BookieId> addresses = PulsarRegistrationClientTest.prepareNBookies(BOOKIES);
                    if (isWritable) {
                        FutureUtils.result((CompletableFuture)rc.watchWritableBookies(listener));
                    } else {
                        FutureUtils.result((CompletableFuture)rc.watchReadOnlyBookies(listener));
                    }
                    for (BookieId address : addresses) {
                        rm.registerBookie(address, !isWritable, new BookieServiceInfo());
                    }
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertFalse((boolean)updates.isEmpty());
                        Assert.assertEquals((int)bookies.size(), (int)BOOKIES);
                    });
                }
                finally {
                    if (Collections.singletonList(rc).get(0) != null) {
                        rc.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(rm).get(0) != null) {
                    rm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNetworkDelayWithBkZkManager() throws Throwable {
        String zksConnectionString = this.zks.getConnectionString();
        String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
        ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null);
        try {
            ServerConfiguration serverConfiguration = new ServerConfiguration();
            serverConfiguration.setZkLedgersRootPath(ledgersRoot);
            FaultInjectableZKRegistrationManager rm = new FaultInjectableZKRegistrationManager(serverConfiguration, zk);
            rm.prepareFormat();
            MetadataStoreExtended store = MetadataStoreExtended.create((String)zksConnectionString, (MetadataStoreConfig)MetadataStoreConfig.builder().fsyncEnable(false).build());
            try {
                PulsarRegistrationClient rc1 = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                try {
                    PulsarRegistrationClient rc2 = new PulsarRegistrationClient((MetadataStore)store, ledgersRoot);
                    try {
                        ArrayList<BookieId> addresses = new ArrayList<BookieId>();
                        for (int i = 0; i < 10; ++i) {
                            addresses.add(BookieId.parse((String)("BOOKIE-" + i)));
                        }
                        HashMap<BookieId, BookieServiceInfo> bookieServiceInfos = new HashMap<BookieId, BookieServiceInfo>();
                        int port = 223;
                        for (BookieId address : addresses) {
                            BookieServiceInfo info = new BookieServiceInfo();
                            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
                            endpoint.setAuth(Collections.emptyList());
                            endpoint.setExtensions(Collections.emptyList());
                            endpoint.setId("id");
                            endpoint.setHost("localhost");
                            endpoint.setPort(port++);
                            endpoint.setProtocol("bookie-rpc");
                            info.setEndpoints(Arrays.asList(endpoint));
                            bookieServiceInfos.put(address, info);
                            rm.registerBookie(address, false, info);
                            rm.writeCookie(address, (Versioned<byte[]>)new Versioned((Object)new byte[0], Version.NEW));
                        }
                        PulsarRegistrationClientTest.getAndVerifyAllBookies((RegistrationClient)rc1, addresses);
                        PulsarRegistrationClientTest.getAndVerifyAllBookies((RegistrationClient)rc2, addresses);
                        Awaitility.await().untilAsserted(() -> this.lambda$testNetworkDelayWithBkZkManager$10(addresses, (RegistrationClient)rc1, bookieServiceInfos, (RegistrationClient)rc2));
                        rm.betweenRegisterReadOnlyBookie(__ -> {
                            try {
                                Thread.sleep(1000L);
                            }
                            catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                            return null;
                        });
                        for (int i = 0; i < addresses.size() / 2; ++i) {
                            BookieId bkId = (BookieId)addresses.get(i);
                            rm.registerBookie(bkId, true, (BookieServiceInfo)bookieServiceInfos.get(bkId));
                        }
                        Awaitility.await().untilAsserted(() -> this.lambda$testNetworkDelayWithBkZkManager$12(addresses, (RegistrationClient)rc1, bookieServiceInfos, (RegistrationClient)rc2));
                    }
                    finally {
                        if (Collections.singletonList(rc2).get(0) != null) {
                            rc2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(rc1).get(0) != null) {
                        rc1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(store).get(0) != null) {
                    store.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(zk).get(0) != null) {
                zk.close();
            }
        }
    }

    private /* synthetic */ void lambda$testNetworkDelayWithBkZkManager$12(List addresses, RegistrationClient rc1, Map bookieServiceInfos, RegistrationClient rc2) throws Throwable {
        for (BookieId address : addresses) {
            this.compareBookieServiceInfo((BookieServiceInfo)((Versioned)rc1.getBookieServiceInfo(address).get()).getValue(), (BookieServiceInfo)bookieServiceInfos.get(address));
            this.compareBookieServiceInfo((BookieServiceInfo)((Versioned)rc2.getBookieServiceInfo(address).get()).getValue(), (BookieServiceInfo)bookieServiceInfos.get(address));
        }
    }

    private /* synthetic */ void lambda$testNetworkDelayWithBkZkManager$10(List addresses, RegistrationClient rc1, Map bookieServiceInfos, RegistrationClient rc2) throws Throwable {
        for (BookieId address : addresses) {
            this.compareBookieServiceInfo((BookieServiceInfo)((Versioned)rc1.getBookieServiceInfo(address).get()).getValue(), (BookieServiceInfo)bookieServiceInfos.get(address));
            this.compareBookieServiceInfo((BookieServiceInfo)((Versioned)rc2.getBookieServiceInfo(address).get()).getValue(), (BookieServiceInfo)bookieServiceInfos.get(address));
        }
    }

    private /* synthetic */ void lambda$testGetBookieServiceInfo$6(List addresses, RegistrationClient rc, Map bookieServiceInfos) throws Throwable {
        for (BookieId address : addresses) {
            BookieServiceInfo bookieServiceInfo = (BookieServiceInfo)((Versioned)rc.getBookieServiceInfo(address).get()).getValue();
            this.compareBookieServiceInfo(bookieServiceInfo, (BookieServiceInfo)bookieServiceInfos.get(address));
        }
    }

    private /* synthetic */ void lambda$testGetBookieServiceInfo$4(List addresses, RegistrationClient rc, Map bookieServiceInfos) throws Throwable {
        for (BookieId address : addresses) {
            BookieServiceInfo bookieServiceInfo = (BookieServiceInfo)((Versioned)rc.getBookieServiceInfo(address).get()).getValue();
            this.compareBookieServiceInfo(bookieServiceInfo, (BookieServiceInfo)bookieServiceInfos.get(address));
        }
    }

    private static /* synthetic */ void lambda$testGetBookieServiceInfo$2(List addresses, RegistrationClient rc) throws Throwable {
        for (BookieId address : addresses) {
            Assert.assertTrue((boolean)(((ExecutionException)Assert.expectThrows(ExecutionException.class, () -> rc.getBookieServiceInfo(address).get())).getCause() instanceof BKException.BKBookieHandleNotAvailableException));
        }
    }

    private /* synthetic */ void lambda$testGetBookieServiceInfo$0(List addresses, RegistrationClient rc, Map bookieServiceInfos) throws Throwable {
        for (BookieId address : addresses) {
            BookieServiceInfo bookieServiceInfo = (BookieServiceInfo)((Versioned)rc.getBookieServiceInfo(address).get()).getValue();
            this.compareBookieServiceInfo(bookieServiceInfo, (BookieServiceInfo)bookieServiceInfos.get(address));
        }
    }
}

