package io.reactiverse.pgclient;

import io.reactiverse.pgclient.ProxyServer;
import io.reactiverse.pgclient.pubsub.PgChannel;
import io.reactiverse.pgclient.pubsub.PgSubscriber;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/reactiverse/pgclient/PubSubTest.class */
public class PubSubTest extends PgTestBase {
    Vertx vertx;
    PgSubscriber subscriber;

    @Before
    public void setup() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void teardown(TestContext testContext) {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testNotify(TestContext testContext) {
        Async async = testContext.async(2);
        PgClient.connect(this.vertx, options, testContext.asyncAssertSuccess(pgConnection -> {
            pgConnection.query("LISTEN the_channel", testContext.asyncAssertSuccess(pgResult -> {
                pgConnection.notificationHandler(pgNotification -> {
                    testContext.assertEquals("the_channel", pgNotification.getChannel());
                    testContext.assertEquals("the message", pgNotification.getPayload());
                    async.countDown();
                });
                pgConnection.query("NOTIFY the_channel, 'the message'", testContext.asyncAssertSuccess(pgResult -> {
                    async.countDown();
                }));
            }));
        }));
    }

    @Test
    public void testConnect(TestContext testContext) {
        this.subscriber = PgSubscriber.subscriber(this.vertx, options);
        Async async = testContext.async();
        PgChannel channel = this.subscriber.channel("channel1");
        PgChannel channel2 = this.subscriber.channel("channel2");
        channel.handler(str -> {
            testContext.assertEquals("msg1", str);
            async.countDown();
        });
        channel2.handler(str2 -> {
            testContext.assertEquals("msg2", str2);
            async.countDown();
        });
        Async async2 = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async2.complete();
        }));
        async2.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY channel1, 'msg1'", testContext.asyncAssertSuccess());
        this.subscriber.actualConnection().query("NOTIFY channel2, 'msg2'", testContext.asyncAssertSuccess());
        async.awaitSuccess(10000L);
    }

    @Test
    public void testSubscribe(TestContext testContext) {
        this.subscriber = PgSubscriber.subscriber(this.vertx, options);
        Async async = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async async2 = testContext.async();
        testContext.assertEquals(channel, channel.subscribeHandler(r32 -> {
            async2.complete();
        }));
        Async async3 = testContext.async();
        channel.handler(str -> {
            testContext.assertEquals("msg", str);
            async3.countDown();
        });
        async2.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY the_channel, 'msg'", testContext.asyncAssertSuccess());
        async3.awaitSuccess(10000L);
    }

    @Test
    public void testUnsubscribe(TestContext testContext) {
        this.subscriber = PgSubscriber.subscriber(this.vertx, options);
        Async async = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async async2 = testContext.async();
        channel.endHandler(r32 -> {
            async2.complete();
        });
        Async async3 = testContext.async();
        channel.subscribeHandler(r33 -> {
            async3.complete();
        });
        channel.handler(str -> {
        });
        async3.awaitSuccess(10000L);
        channel.handler((Handler) null);
        async2.awaitSuccess(10000L);
    }

    @Test
    public void testReconnect(TestContext testContext) {
        PgConnectOptions pgConnectOptions = new PgConnectOptions(PgTestBase.options);
        ProxyServer create = ProxyServer.create(this.vertx, pgConnectOptions.getPort(), pgConnectOptions.getHost());
        AtomicReference atomicReference = new AtomicReference();
        create.proxyHandler(connection -> {
            atomicReference.set(connection);
            connection.connect();
        });
        Async async = testContext.async();
        create.listen(8080, "localhost", testContext.asyncAssertSuccess(r5 -> {
            pgConnectOptions.setPort(8080).setHost("localhost");
            async.complete();
        }));
        async.awaitSuccess(10000L);
        this.subscriber = PgSubscriber.subscriber(this.vertx, pgConnectOptions);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        channel.subscribeHandler(r6 -> {
            switch (atomicInteger.getAndIncrement()) {
                case 0:
                    async2.complete();
                    return;
                case 1:
                    async3.complete();
                    return;
                case 2:
                    async4.complete();
                    return;
                default:
                    return;
            }
        });
        this.subscriber.connect(asyncResult -> {
        });
        channel.handler(str -> {
        });
        async2.awaitSuccess(10000L);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.subscriber.reconnectPolicy(num -> {
            testContext.assertEquals(0, num);
            testContext.assertFalse(this.subscriber.closed());
            return atomicInteger2.getAndIncrement() < 2 ? 100L : -1L;
        });
        Async async5 = testContext.async();
        this.subscriber.closeHandler(r3 -> {
            async5.complete();
        });
        ((ProxyServer.Connection) atomicReference.get()).close();
        async3.awaitSuccess(10000L);
        ((ProxyServer.Connection) atomicReference.get()).close();
        async4.awaitSuccess(10000L);
        ((ProxyServer.Connection) atomicReference.get()).close();
        async5.awaitSuccess(10000L);
        testContext.assertEquals(3, Integer.valueOf(atomicInteger2.get()));
        testContext.assertTrue(this.subscriber.closed());
    }

    @Test
    public void testClose(TestContext testContext) {
        PgSubscriber subscriber = PgSubscriber.subscriber(this.vertx, options);
        PgChannel channel = subscriber.channel("the_channel");
        Async async = testContext.async();
        channel.endHandler(r3 -> {
            async.complete();
        });
        channel.handler(str -> {
        });
        Async async2 = testContext.async();
        subscriber.connect(testContext.asyncAssertSuccess(r32 -> {
            async2.complete();
        }));
        async2.awaitSuccess(10000L);
        Async async3 = testContext.async();
        subscriber.closeHandler(r33 -> {
            async3.complete();
        });
        subscriber.close();
        async.awaitSuccess(10000L);
        async3.awaitSuccess(10000L);
    }
}
