package com.questdb.net.ha;

import com.questdb.Journal;
import com.questdb.JournalWriter;
import com.questdb.ex.JournalNetworkException;
import com.questdb.model.Quote;
import com.questdb.model.TestEntity;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.store.TxListener;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/questdb/net/ha/IntegrationTest.class */
public class IntegrationTest extends AbstractTest {
    private JournalClient client;
    private JournalServer server;

    @Before
    public void setUp() {
        this.server = new JournalServer(new ServerConfig() { // from class: com.questdb.net.ha.IntegrationTest.1
            {
                setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100L));
                setEnableMultiCast(false);
            }
        }, this.factory);
        this.client = new JournalClient(new ClientConfig("localhost"), this.factory);
    }

    @Test(expected = JournalNetworkException.class)
    public void testClientConnect() throws Exception {
        this.client.start();
    }

    @Test
    public void testClientConnectServerHalt() throws Exception {
        this.server.start();
        this.client.start();
        Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        this.server.halt();
        Assert.assertEquals(0L, this.server.getConnectedClients());
        Assert.assertFalse(this.server.isRunning());
        Thread.sleep(500L);
        Assert.assertFalse(this.client.isRunning());
        this.client.halt();
    }

    @Test
    public void testClientDisconnect() throws Exception {
        this.server.start();
        this.client.start();
        Thread.sleep(100L);
        this.client.halt();
        Assert.assertFalse(this.client.isRunning());
        Thread.sleep(100L);
        Assert.assertEquals(0L, this.server.getConnectedClients());
        this.server.halt();
    }

    @Test
    public void testOutOfSyncClient() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "remote", 2 * 10000);
        this.server.publish(writer);
        this.server.start();
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.client.subscribe(Quote.class, "remote", "local", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.2
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
            }
        });
        this.client.start();
        TestUtils.generateQuoteData(writer, 10000);
        TestUtils.assertCounter(atomicInteger, 1, 1L, TimeUnit.SECONDS);
        this.client.halt();
        TestUtils.assertDataEquals(writer, this.factory.reader(Quote.class, "local"));
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer, 10000, writer.getMaxTimestamp());
        writer.commit();
        JournalWriter writer2 = this.factory.writer(Quote.class, "local");
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer2, 10000, writer2.getMaxTimestamp());
        writer2.commit();
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer2, 10000, writer2.getMaxTimestamp());
        writer2.commit();
        writer2.close();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        this.client = new JournalClient(new ClientConfig("localhost"), this.factory);
        this.client.subscribe(Quote.class, "remote", "local", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.3
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
                atomicInteger2.incrementAndGet();
            }
        });
        this.client.start();
        TestUtils.assertCounter(atomicInteger, 1, 1L, TimeUnit.SECONDS);
        TestUtils.assertCounter(atomicInteger2, 1, 1L, TimeUnit.SECONDS);
        this.client.halt();
        this.server.halt();
    }

    @Test
    public void testOutOfSyncServerSide() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "remote", 2 * 10000);
        this.server.publish(writer);
        this.server.start();
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.client.subscribe(Quote.class, "remote", "local", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.4
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
            }
        });
        this.client.start();
        TestUtils.generateQuoteData(writer, 10000);
        TestUtils.assertCounter(atomicInteger, 1, 1L, TimeUnit.SECONDS);
        this.client.halt();
        TestUtils.assertDataEquals(writer, this.factory.reader(Quote.class, "local"));
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer, 10000, writer.getMaxTimestamp());
        writer.commit();
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer, 10000, writer.getMaxTimestamp());
        writer.commit();
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer, 10000, writer.getMaxTimestamp());
        writer.commit();
        JournalWriter writer2 = this.factory.writer(Quote.class, "local");
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer2, 10000, writer2.getMaxTimestamp());
        writer2.commit();
        TestUtils.generateQuoteData((JournalWriter<Quote>) writer2, 10000, writer2.getMaxTimestamp());
        writer2.commit();
        writer2.close();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        this.client = new JournalClient(new ClientConfig("localhost"), this.factory);
        this.client.subscribe(Quote.class, "remote", "local", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.5
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
                atomicInteger2.incrementAndGet();
            }
        });
        this.client.start();
        TestUtils.assertCounter(atomicInteger, 1, 1L, TimeUnit.SECONDS);
        TestUtils.assertCounter(atomicInteger2, 1, 1L, TimeUnit.SECONDS);
        this.client.halt();
        this.server.halt();
    }

    @Test
    public void testServerIdleStartStop() throws Exception {
        this.server.publish(this.factory.writer(Quote.class, "remote"));
        this.server.start();
        this.client.subscribe(Quote.class, "remote", "local");
        this.client.start();
        Thread.sleep(100L);
        this.server.halt();
        Assert.assertFalse(this.server.isRunning());
    }

    @Test
    public void testServerStartStop() throws Exception {
        this.server.start();
        this.server.halt();
        Assert.assertFalse(this.server.isRunning());
    }

    @Test
    public void testSingleJournalSync() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "remote", 2 * 100000);
        this.server.publish(writer);
        this.server.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.client.subscribe(Quote.class, "remote", "local", 2 * 100000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.6
            public void onCommit() {
                countDownLatch.countDown();
            }

            public void onError() {
            }
        });
        this.client.start();
        TestUtils.generateQuoteData(writer, 100000);
        countDownLatch.await();
        this.client.halt();
        this.server.halt();
        TestUtils.assertDataEquals(writer, this.factory.reader(Quote.class, "local"));
    }

    @Test
    public void testTwoClientSync() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "origin");
        TestUtils.generateQuoteData(writer, 10000);
        JournalWriter writer2 = this.factory.writer(Quote.class, "remote");
        writer2.append(writer.query().all().asResultSet().subset(0, 1000));
        writer2.commit();
        this.server.publish(writer2);
        this.server.start();
        final AtomicInteger atomicInteger = new AtomicInteger();
        JournalClient journalClient = new JournalClient(new ClientConfig("localhost"), this.factory);
        journalClient.subscribe(Quote.class, "remote", "local1", new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.7
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
            }
        });
        journalClient.start();
        JournalClient journalClient2 = new JournalClient(new ClientConfig("localhost"), this.factory);
        journalClient2.subscribe(Quote.class, "remote", "local2", new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.8
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
            }
        });
        journalClient2.start();
        TestUtils.assertCounter(atomicInteger, 2, 2L, TimeUnit.SECONDS);
        journalClient.halt();
        writer2.append(writer.query().all().asResultSet().subset(1000, 1500));
        writer2.commit();
        TestUtils.assertCounter(atomicInteger, 3, 2L, TimeUnit.SECONDS);
        JournalClient journalClient3 = new JournalClient(new ClientConfig("localhost"), this.factory);
        journalClient3.subscribe(Quote.class, "remote", "local1", new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.9
            public void onCommit() {
                atomicInteger.incrementAndGet();
            }

            public void onError() {
            }
        });
        journalClient3.start();
        writer2.append(writer.query().all().asResultSet().subset(1500, 10000));
        writer2.commit();
        TestUtils.assertCounter(atomicInteger, 6, 2L, TimeUnit.SECONDS);
        Journal reader = this.factory.reader(Quote.class, "local1");
        Journal reader2 = this.factory.reader(Quote.class, "local2");
        Assert.assertEquals(10000, reader.size());
        Assert.assertEquals(10000, reader2.size());
        journalClient3.halt();
        journalClient2.halt();
        this.server.halt();
    }

    @Test
    public void testTwoJournalsSync() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "remote1", 2 * 10000);
        JournalWriter writer2 = this.factory.writer(TestEntity.class, "remote2", 2 * 10000);
        this.server.publish(writer);
        this.server.publish(writer2);
        this.server.start();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        this.client.subscribe(Quote.class, "remote1", "local1", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.10
            public void onCommit() {
                countDownLatch.countDown();
            }

            public void onError() {
            }
        });
        this.client.subscribe(TestEntity.class, "remote2", "local2", 2 * 10000, new TxListener() { // from class: com.questdb.net.ha.IntegrationTest.11
            public void onCommit() {
                countDownLatch.countDown();
            }

            public void onError() {
            }
        });
        this.client.start();
        TestUtils.generateQuoteData(writer, 10000);
        TestUtils.generateTestEntityData(writer2, 10000);
        countDownLatch.await();
        this.client.halt();
        this.server.halt();
        Assert.assertEquals("Local1 has wrong size", 10000, this.factory.reader(Quote.class, "local1").size());
        Journal reader = this.factory.reader(TestEntity.class, "local2");
        Assert.assertEquals("Remote2 has wrong size", 10000, writer2.size());
        Assert.assertEquals("Local2 has wrong size", 10000, reader.size());
    }

    @Test
    public void testWriterShutdown() throws Exception {
        JournalWriter writer = this.factory.writer(Quote.class, "remote", 2 * 10000);
        Throwable th = null;
        try {
            try {
                this.server.publish(writer);
                this.server.start();
                this.client.subscribe(Quote.class, "remote", "local", 2 * 10000);
                this.client.start();
                TestUtils.generateQuoteData((JournalWriter<Quote>) writer, 10000, 0L);
                if (writer != null) {
                    if (0 != 0) {
                        try {
                            writer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        writer.close();
                    }
                }
                this.client.halt();
                this.server.halt();
            } finally {
            }
        } catch (Throwable th3) {
            if (writer != null) {
                if (th != null) {
                    try {
                        writer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    writer.close();
                }
            }
            throw th3;
        }
    }
}
