package io.vertx.amqpbridge;

import io.vertx.amqpbridge.impl.AmqpBridgeImpl;
import io.vertx.amqpbridge.impl.BridgeMetaDataSupportImpl;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.message.Message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/amqpbridge/AmqpBridgeTest.class */
public class AmqpBridgeTest extends ActiveMQTestBase {
    private static Logger LOG = LoggerFactory.getLogger(AmqpBridgeTest.class);
    private Vertx vertx;

    @Override // io.vertx.amqpbridge.ActiveMQTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.vertx = Vertx.vertx();
    }

    @Override // io.vertx.amqpbridge.ActiveMQTestBase
    @After
    public void tearDown() throws Exception {
        try {
            super.tearDown();
        } finally {
            if (this.vertx != null) {
                this.vertx.close();
            }
        }
    }

    @Test(timeout = 20000)
    public void testBasicStartup(TestContext testContext) throws Exception {
        testContext.assertEquals(0L, Long.valueOf(getBrokerAdminView(testContext).getTotalConnectionsCount()), "unexpected total connection count before");
        testContext.assertEquals(0, Integer.valueOf(getBrokerAdminView(testContext).getCurrentConnectionsCount()), "unexpected current connection count before");
        Async async = testContext.async();
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", getBrokerAmqpConnectorPort(), asyncResult -> {
            LOG.trace("Startup complete");
            testContext.assertTrue(asyncResult.succeeded());
            testContext.assertEquals(1L, Long.valueOf(getBrokerAdminView(testContext).getTotalConnectionsCount()), "unexpected total connection count during");
            testContext.assertEquals(1, Integer.valueOf(getBrokerAdminView(testContext).getCurrentConnectionsCount()), "unexpected current connection count during");
            create.close(asyncResult -> {
                LOG.trace("Shutdown complete");
                testContext.assertTrue(asyncResult.succeeded());
                testContext.assertEquals(1L, Long.valueOf(getBrokerAdminView(testContext).getTotalConnectionsCount()), "unexpected total connection count after");
                testContext.assertEquals(0, Integer.valueOf(getBrokerAdminView(testContext).getCurrentConnectionsCount()), "unexpected current connection count after");
                async.complete();
            });
        });
        async.awaitSuccess();
    }

    private BrokerView getBrokerAdminView(TestContext testContext) {
        try {
            return getBrokerService().getAdminView();
        } catch (Exception e) {
            testContext.fail(e);
            return null;
        }
    }

    @Test(timeout = 20000)
    public void testConnectionMetaData(TestContext testContext) throws Exception {
        stopBroker();
        Async async = testContext.async();
        Async async2 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.closeHandler(asyncResult -> {
                protonConnection.close();
            });
            protonConnection.openHandler(asyncResult2 -> {
                protonConnection.open();
                Map remoteProperties = protonConnection.getRemoteProperties();
                testContext.assertNotNull(remoteProperties, "connection properties not present");
                testContext.assertTrue(remoteProperties.containsKey(BridgeMetaDataSupportImpl.PRODUCT_KEY), "product property key not present");
                testContext.assertEquals("vertx-amqp-bridge", remoteProperties.get(BridgeMetaDataSupportImpl.PRODUCT_KEY), "unexpected product property value");
                testContext.assertTrue(remoteProperties.containsKey(BridgeMetaDataSupportImpl.VERSION_KEY), "version property key not present");
                testContext.assertEquals(BridgeMetaDataSupportImpl.VERSION, remoteProperties.get(BridgeMetaDataSupportImpl.VERSION_KEY), "unexpected version property value");
                async.complete();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            async.awaitSuccess();
            LOG.trace("Shutting down");
            create.close(asyncResult -> {
                LOG.trace("Shutdown complete");
                testContext.assertTrue(asyncResult.succeeded());
                async2.complete();
            });
        });
        try {
            async2.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testReceiveBasicMessage(TestContext testContext) throws Exception {
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        String str2 = "appPropKey";
        String str3 = "appPropValue";
        Async async = testContext.async();
        Async async2 = testContext.async();
        int brokerAmqpConnectorPort = getBrokerAmqpConnectorPort();
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", brokerAmqpConnectorPort, asyncResult -> {
            LOG.trace("Startup complete");
            testContext.assertEquals(testName, create.createConsumer(testName).handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body was not as expected");
                testContext.assertTrue(jsonObject.containsKey("application_properties"), "application properties element not present");
                JsonObject jsonObject2 = jsonObject.getJsonObject("application_properties");
                testContext.assertTrue(jsonObject2.containsKey(str2), "expected property key element not present");
                testContext.assertEquals(str3, jsonObject2.getValue(str2), "app property valuenot as expected");
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async.complete();
                });
            }).address(), "address was not as expected");
        });
        ProtonClient.create(this.vertx).connect("localhost", brokerAmqpConnectorPort, asyncResult2 -> {
            testContext.assertTrue(asyncResult2.succeeded());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str));
            HashMap hashMap = new HashMap();
            hashMap.put(str2, str3);
            message.setApplicationProperties(new ApplicationProperties(hashMap));
            ProtonConnection open = ((ProtonConnection) asyncResult2.result()).open();
            open.createSender(testName).open().send(message, protonDelivery -> {
                testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no remote state");
                testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message was not accepted");
                testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                open.closeHandler(asyncResult2 -> {
                    open.disconnect();
                }).close();
                async2.complete();
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReceiveBasicMessageAsBodyStream(TestContext testContext) throws Exception {
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        int brokerAmqpConnectorPort = getBrokerAmqpConnectorPort();
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", brokerAmqpConnectorPort, asyncResult -> {
            LOG.trace("Startup complete");
            create.createConsumer(testName).bodyStream().handler(jsonObject -> {
                testContext.assertNotNull(jsonObject, "jsonObject was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body was not as expected");
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async.complete();
                });
            });
        });
        ProtonClient.create(this.vertx).connect("localhost", brokerAmqpConnectorPort, asyncResult2 -> {
            testContext.assertTrue(asyncResult2.succeeded());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str));
            ProtonConnection open = ((ProtonConnection) asyncResult2.result()).open();
            open.createSender(testName).open().send(message, protonDelivery -> {
                testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no remote state");
                testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message was not accepted");
                testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                open.closeHandler(asyncResult2 -> {
                    open.disconnect();
                }).close();
                async2.complete();
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testSendBasicMessage(TestContext testContext) throws Exception {
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        String str2 = "appPropKey";
        String str3 = "appPropValue";
        Async async = testContext.async();
        int brokerAmqpConnectorPort = getBrokerAmqpConnectorPort();
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", brokerAmqpConnectorPort, asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            JsonObject jsonObject = new JsonObject();
            jsonObject.put(str2, str3);
            JsonObject jsonObject2 = new JsonObject();
            jsonObject2.put("body", str);
            jsonObject2.put("application_properties", jsonObject);
            createProducer.send(jsonObject2);
            testContext.assertEquals(testName, createProducer.address(), "address was not as expected");
        });
        ProtonClient.create(this.vertx).connect("localhost", brokerAmqpConnectorPort, asyncResult2 -> {
            testContext.assertTrue(asyncResult2.succeeded());
            Proton.message().setBody(new AmqpValue(str));
            ProtonConnection open = ((ProtonConnection) asyncResult2.result()).open();
            open.createReceiver(testName).handler((protonDelivery, message) -> {
                AmqpValue body = message.getBody();
                testContext.assertNotNull(body);
                testContext.assertTrue(body instanceof AmqpValue);
                testContext.assertEquals(str, body.getValue(), "Unexpected message body");
                ApplicationProperties applicationProperties = message.getApplicationProperties();
                testContext.assertNotNull(applicationProperties, "application properties section not present");
                testContext.assertTrue(applicationProperties.getValue().containsKey(str2), "property key not present");
                testContext.assertEquals(str3, applicationProperties.getValue().get(str2), "Unexpected property value");
                async.complete();
                open.closeHandler(asyncResult2 -> {
                    open.disconnect();
                }).close();
            }).open();
        });
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testBasicRequestReply(TestContext testContext) {
        Async async = testContext.async();
        Async async2 = testContext.async();
        String testName = getTestName();
        String str = "myStringContent";
        String str2 = "myStringReply";
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", getBrokerAmqpConnectorPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("body", str);
            createProducer.send(jsonObject, asyncResult -> {
                LOG.trace("Sender got reply");
                testContext.assertEquals(str2, ((JsonObject) ((io.vertx.core.eventbus.Message) asyncResult.result()).body()).getValue("body"), "unexpected reply msg content");
                testContext.assertNotNull(((io.vertx.core.eventbus.Message) asyncResult.result()).address(), "address was not set on reply");
                testContext.assertNull(((io.vertx.core.eventbus.Message) asyncResult.result()).replyAddress(), "reply address was unexpectedly set on the reply");
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async2.complete();
                });
            });
            LOG.trace("Client sent msg");
            create.createConsumer(testName).handler(message -> {
                JsonObject jsonObject2 = (JsonObject) message.body();
                LOG.trace("Consumer got request msg: " + jsonObject2);
                testContext.assertNotNull(jsonObject2, "expected msg body but none found");
                testContext.assertEquals(str, jsonObject2.getValue("body"), "unexpected msg content");
                testContext.assertNotNull(message.replyAddress(), "reply address was not set on the request");
                JsonObject jsonObject3 = new JsonObject();
                jsonObject3.put("body", str2);
                message.reply(jsonObject3);
                async.complete();
            });
        });
        async.awaitSuccess();
        async2.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReplyToOriginalReply(TestContext testContext) {
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        String testName = getTestName();
        String str = "myStringContent";
        String str2 = "myStringReply";
        String str3 = "myStringReplyToReply";
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", getBrokerAmqpConnectorPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("body", str);
            createProducer.send(jsonObject, asyncResult -> {
                LOG.trace("Sender got first reply");
                io.vertx.core.eventbus.Message message = (io.vertx.core.eventbus.Message) asyncResult.result();
                testContext.assertEquals(str2, ((JsonObject) message.body()).getValue("body"), "unexpected reply msg content");
                testContext.assertNotNull(message.address(), "address was not set on the reply");
                testContext.assertNotNull(message.replyAddress(), "reply address was not set on the reply");
                async2.complete();
                JsonObject jsonObject2 = new JsonObject();
                jsonObject2.put("body", str3);
                message.reply(jsonObject2);
            });
            LOG.trace("Client sent msg");
            create.createConsumer(testName).handler(message -> {
                JsonObject jsonObject2 = (JsonObject) message.body();
                LOG.trace("Receiver got request: " + jsonObject2);
                testContext.assertNotNull(jsonObject2, "expected msg body but none found");
                testContext.assertEquals(str, jsonObject2.getValue("body"), "unexpected msg content");
                testContext.assertNotNull(message.replyAddress(), "reply address was not set on the request");
                JsonObject jsonObject3 = new JsonObject();
                jsonObject3.put("body", str2);
                message.reply(jsonObject3, asyncResult2 -> {
                    LOG.trace("Receiver got reply to reply");
                    io.vertx.core.eventbus.Message message = (io.vertx.core.eventbus.Message) asyncResult2.result();
                    testContext.assertEquals(str3, ((JsonObject) message.body()).getValue("body"), "unexpected 2nd reply msg content");
                    testContext.assertNull(message.replyAddress(), "reply address was unexpectedly set on 2nd reply");
                    LOG.trace("Shutting down");
                    create.close(asyncResult2 -> {
                        LOG.trace("Shutdown complete");
                        testContext.assertTrue(asyncResult2.succeeded());
                        async3.complete();
                    });
                });
                async.complete();
            });
        });
        async.awaitSuccess();
        async2.awaitSuccess();
        async3.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterDelayedHandlerAddition(TestContext testContext) throws Exception {
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        int brokerAmqpConnectorPort = getBrokerAmqpConnectorPort();
        int i = 5;
        ProtonClient.create(this.vertx).connect("localhost", brokerAmqpConnectorPort, asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str));
            ProtonConnection open = ((ProtonConnection) asyncResult.result()).open();
            ProtonSender open2 = open.createSender(testName).open();
            for (int i2 = 1; i2 <= i; i2++) {
                int i3 = i2;
                open2.send(message, protonDelivery -> {
                    LOG.trace("Running onUpdated for sent message " + i3);
                    testContext.assertNotNull(protonDelivery.getRemoteState(), "message " + i3 + " had no remote state");
                    testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message " + i3 + " was not accepted");
                    testContext.assertTrue(protonDelivery.remotelySettled(), "message " + i3 + " was not settled");
                    if (i3 == i) {
                        open.closeHandler(asyncResult -> {
                            open.disconnect();
                        }).close();
                        async2.complete();
                    }
                });
            }
        });
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", brokerAmqpConnectorPort, asyncResult2 -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            this.vertx.setTimer(500L, l -> {
                AtomicInteger atomicInteger = new AtomicInteger();
                createConsumer.handler(message -> {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    LOG.trace("Received message " + incrementAndGet);
                    JsonObject jsonObject = (JsonObject) message.body();
                    testContext.assertNotNull(jsonObject, "message " + incrementAndGet + " jsonObject body was null");
                    Object value = jsonObject.getValue("body");
                    testContext.assertNotNull(value, "amqp message " + incrementAndGet + " body content was null");
                    testContext.assertEquals(str, value, "amqp message " + incrementAndGet + " body not as expected");
                    if (incrementAndGet == i) {
                        LOG.trace("Shutting down");
                        create.close(asyncResult2 -> {
                            LOG.trace("Shutdown complete");
                            testContext.assertTrue(asyncResult2.succeeded());
                            async.complete();
                        });
                    }
                });
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterPause(TestContext testContext) throws Exception {
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        int brokerAmqpConnectorPort = getBrokerAmqpConnectorPort();
        ProtonClient.create(this.vertx).connect("localhost", brokerAmqpConnectorPort, asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str));
            ProtonConnection open = ((ProtonConnection) asyncResult.result()).open();
            ProtonSender open2 = open.createSender(testName).open();
            for (int i = 1; i <= 5; i++) {
                int i2 = i;
                open2.send(message, protonDelivery -> {
                    LOG.trace("Running onUpdated for sent message " + i2);
                    testContext.assertNotNull(protonDelivery.getRemoteState(), "message " + i2 + " had no remote state");
                    testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message " + i2 + " was not accepted");
                    testContext.assertTrue(protonDelivery.remotelySettled(), "message " + i2 + " was not settled");
                    if (i2 == 5) {
                        open.closeHandler(asyncResult -> {
                            open.disconnect();
                        }).close();
                        async2.complete();
                    }
                });
            }
        });
        AmqpBridge create = AmqpBridge.create(this.vertx);
        create.start("localhost", brokerAmqpConnectorPort, asyncResult2 -> {
            LOG.trace("Startup complete");
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicLong atomicLong = new AtomicLong();
            MessageConsumer createConsumer = create.createConsumer(testName);
            createConsumer.handler(message -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                LOG.trace("Received message " + incrementAndGet);
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message " + incrementAndGet + " jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message " + incrementAndGet + " body content was null");
                testContext.assertEquals(str, value, "amqp message " + incrementAndGet + " body not as expected");
                if (incrementAndGet == 2) {
                    LOG.trace("Pausing");
                    createConsumer.pause();
                    atomicLong.set(System.currentTimeMillis());
                    this.vertx.setTimer(500L, l -> {
                        LOG.trace("Resuming");
                        createConsumer.resume();
                    });
                }
                if (incrementAndGet > 2) {
                    testContext.assertTrue(atomicLong.get() > 0, "pause start not initialised before receiving msg" + incrementAndGet);
                    testContext.assertTrue(System.currentTimeMillis() + 500 > atomicLong.get(), "delivery occurred before expected");
                }
                if (incrementAndGet == 5) {
                    LOG.trace("Shutting down");
                    create.close(asyncResult2 -> {
                        LOG.trace("Shutdown complete");
                        testContext.assertTrue(asyncResult2.succeeded());
                        async.complete();
                    });
                }
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testProducerClose(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testProducerEnd(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, true);
    }

    private void doProducerCloseTestImpl(TestContext testContext, boolean z) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                LOG.trace("Server receiver open");
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(testName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    async.complete();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                    async2.complete();
                });
                protonReceiver.open();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            createProducer.exceptionHandler(th -> {
                atomicBoolean.set(true);
            });
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("body", str);
            createProducer.send(jsonObject);
            if (z) {
                createProducer.end();
            } else {
                createProducer.close();
            }
            create.close(asyncResult -> {
                LOG.trace("Shutdown complete");
                testContext.assertTrue(asyncResult.succeeded());
                async3.complete();
            });
        });
        try {
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            testContext.assertFalse(atomicBoolean.get(), "exception handler unexpectedly called");
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerUnregisterCompletionNotification(TestContext testContext) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            handleReceiverOpenSendMessageThenClose(protonConnection, testName, str, testContext);
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            testContext.assertFalse(createConsumer.isRegistered(), "expected registered to be false");
            createConsumer.exceptionHandler(th -> {
                atomicBoolean.set(true);
            });
            createConsumer.handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body not as expected");
                createConsumer.unregister(asyncResult -> {
                    LOG.trace("Unregister completed");
                    testContext.assertTrue(asyncResult.succeeded(), "Expected unregistration to succeed");
                    async.complete();
                    LOG.trace("Shutting down");
                    create.close(asyncResult -> {
                        LOG.trace("Shutdown complete");
                        testContext.assertTrue(asyncResult.succeeded());
                        async2.complete();
                    });
                });
            });
            testContext.assertTrue(createConsumer.isRegistered(), "expected registered to be true");
        });
        try {
            testContext.assertFalse(atomicBoolean.get(), "exception handler unexpectedly called");
            async.awaitSuccess();
            async2.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    private void handleReceiverOpenSendMessageThenClose(ProtonConnection protonConnection, String str, String str2, TestContext testContext) {
        protonConnection.openHandler(asyncResult -> {
            LOG.trace("Server connection open");
            protonConnection.closeHandler(asyncResult -> {
                protonConnection.close();
            });
            protonConnection.open();
        });
        protonConnection.sessionOpenHandler(protonSession -> {
            LOG.trace("Server session open");
            protonSession.open();
        });
        protonConnection.senderOpenHandler(protonSender -> {
            LOG.trace("Server sender open");
            Source remoteSource = protonSender.getRemoteSource();
            testContext.assertNotNull(remoteSource, "source should not be null");
            testContext.assertEquals(str, remoteSource.getAddress(), "expected given address");
            protonSender.setSource(remoteSource.copy());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str2));
            protonSender.send(message);
            protonSender.closeHandler(asyncResult2 -> {
                protonSender.close();
            });
            protonSender.open();
        });
    }

    @Test(timeout = 20000)
    public void testSenderFlowControlMechanisms(TestContext testContext) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                LOG.trace("Server receiver open");
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(testName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.setAutoAccept(false);
                protonReceiver.setPrefetch(0);
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    this.vertx.setTimer(250L, l -> {
                        async2.awaitSuccess();
                        LOG.trace("Sending subsequent credit after delay");
                        protonReceiver.flow(1);
                    });
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
                this.vertx.setTimer(250L, l -> {
                    async.awaitSuccess();
                    LOG.trace("Sending credit after delay");
                    protonReceiver.flow(1);
                });
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            testContext.assertTrue(createProducer.writeQueueFull(), "expected write queue to be full, we have not yet granted credit");
            createProducer.drainHandler(r14 -> {
                testContext.assertTrue(async.isSucceeded(), "should have been called after initial credit delay");
                testContext.assertFalse(createProducer.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                JsonObject jsonObject = new JsonObject();
                jsonObject.put("body", str);
                createProducer.send(jsonObject);
                testContext.assertTrue(createProducer.writeQueueFull(), "expected write queue to be full, we just used all the credit");
                createProducer.drainHandler(r9 -> {
                    testContext.assertTrue(async2.isSucceeded(), "should have been called after 2nd credit delay");
                    testContext.assertFalse(createProducer.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                    LOG.trace("Shutting down");
                    create.close(asyncResult -> {
                        LOG.trace("Shutdown complete");
                        testContext.assertTrue(asyncResult.succeeded());
                        async3.complete();
                    });
                });
                async2.complete();
            });
            async.complete();
        });
        try {
            async3.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyWithErrorCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, true);
    }

    private void doSenderClosedRemotelyCallsExceptionHandlerTestImpl(TestContext testContext, boolean z) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                LOG.trace("Server receiver open");
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(testName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    if (z) {
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                    }
                    protonReceiver.close();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            MessageProducer createProducer = create.createProducer(testName);
            createProducer.exceptionHandler(th -> {
                testContext.assertNotNull(th, "expected exception");
                testContext.assertTrue(th instanceof VertxException, "expected vertx exception");
                if (z) {
                    testContext.assertNotNull(th.getCause(), "expected cause");
                } else {
                    testContext.assertNull(th.getCause(), "expected no cause");
                }
                LOG.trace("Producer exception handler called:", th);
                async2.complete();
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async.complete();
                });
            });
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("body", str);
            createProducer.send(jsonObject);
        });
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyWithErrorCallsExceptionHandler(TestContext testContext) throws Exception {
        doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(testContext, true);
    }

    private void doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(TestContext testContext, boolean z) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                LOG.trace("Server sender open");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(testName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.open();
                Message message = Proton.message();
                message.setBody(new AmqpValue(str));
                protonSender.send(message);
                if (z) {
                    protonSender.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                }
                protonSender.close();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            createConsumer.exceptionHandler(th -> {
                testContext.assertNotNull(th, "expected exception");
                testContext.assertTrue(th instanceof VertxException, "expected vertx exception");
                if (z) {
                    testContext.assertNotNull(th.getCause(), "expected cause");
                } else {
                    testContext.assertNull(th.getCause(), "expected no cause");
                }
                LOG.trace("Producer exception handler called:", th);
                testContext.assertTrue(atomicBoolean.get(), "expected msg to be received first");
                async2.complete();
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async.complete();
                });
            });
            createConsumer.handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body not as expected");
                atomicBoolean.set(true);
            });
        });
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyCallsEndHandler(TestContext testContext) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                LOG.trace("Server sender open");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(testName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.open();
                Message message = Proton.message();
                message.setBody(new AmqpValue(str));
                protonSender.send(message);
                protonSender.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                protonSender.close();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            createConsumer.endHandler(r9 -> {
                testContext.assertTrue(atomicBoolean.get(), "expected msg to be received first");
                async2.complete();
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async.complete();
                });
            });
            createConsumer.handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body not as expected");
                atomicBoolean.set(true);
            });
        });
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerUnregisteredLocallyDoesNotCallEndHandler(TestContext testContext) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        Async async = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                LOG.trace("Server sender open");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(testName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                Message message = Proton.message();
                message.setBody(new AmqpValue(str));
                protonSender.send(message);
                protonSender.closeHandler(asyncResult2 -> {
                    protonSender.close();
                });
                protonSender.open();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            createConsumer.endHandler(r4 -> {
                testContext.fail("should not call end handler");
            });
            createConsumer.handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body not as expected");
                createConsumer.unregister(asyncResult -> {
                    this.vertx.setTimer(50L, l -> {
                        LOG.trace("Shutting down");
                        create.close(asyncResult -> {
                            LOG.trace("Shutdown complete");
                            testContext.assertTrue(asyncResult.succeeded());
                            async.complete();
                        });
                    });
                });
            });
        });
        try {
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testInitialCredit(TestContext testContext) throws Exception {
        doConsumerInitialCreditTestImpl(testContext, false, 1000);
    }

    @Test(timeout = 20000)
    public void testInitialCreditInfluencedByConsumerBufferSize(TestContext testContext) throws Exception {
        doConsumerInitialCreditTestImpl(testContext, true, 42);
    }

    private void doConsumerInitialCreditTestImpl(TestContext testContext, boolean z, int i) throws Exception {
        stopBroker();
        String testName = getTestName();
        String str = "myMessageContent-" + testName;
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Async async = testContext.async();
        Async async2 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                LOG.trace("Server connection open");
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                LOG.trace("Server session open");
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                LOG.trace("Server sender open");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(testName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.sendQueueDrainHandler(protonSender -> {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(((ProtonSenderImpl) protonSender).getCredit()), "unexpected initial credit");
                        testContext.assertFalse(protonSender.sendQueueFull(), "expected send queue not to be full");
                        async.complete();
                        Message message = Proton.message();
                        message.setBody(new AmqpValue(str));
                        protonSender.send(message);
                    }
                });
                protonSender.open();
            });
        });
        AmqpBridgeImpl create = AmqpBridge.create(this.vertx);
        create.setReplyHandlerSupported(false);
        create.start("localhost", mockServer.actualPort(), asyncResult -> {
            LOG.trace("Startup complete");
            MessageConsumer createConsumer = create.createConsumer(testName);
            if (z) {
                createConsumer.setMaxBufferedMessages(i);
            }
            createConsumer.handler(message -> {
                JsonObject jsonObject = (JsonObject) message.body();
                testContext.assertNotNull(jsonObject, "message jsonObject body was null");
                Object value = jsonObject.getValue("body");
                testContext.assertNotNull(value, "amqp message body content was null");
                testContext.assertEquals(str, value, "amqp message body not as expected");
                LOG.trace("Shutting down");
                create.close(asyncResult -> {
                    LOG.trace("Shutdown complete");
                    testContext.assertTrue(asyncResult.succeeded());
                    async2.complete();
                });
            });
        });
        try {
            async.awaitSuccess();
            async2.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }
}
