package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

/* JADX INFO: Access modifiers changed from: package-private */
@ExtendWith({DockerClusterRabbitMQExtension.class})
/* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQClusterTest.class */
public class RabbitMQClusterTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);
    private static final String QUEUE = "queue";

    @Nested
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQClusterTest$ClusterNodesFailure.class */
    class ClusterNodesFailure {
        private ConnectionFactory node1ConnectionFactory;
        private Connection resilientConnection;
        private Channel resilientChannel;
        private Connection node2Connection;
        private Channel node2Channel;

        ClusterNodesFailure() {
        }

        @BeforeEach
        void setup(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws IOException, TimeoutException {
            this.node1ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ1().connectionFactory();
            this.resilientConnection = this.node1ConnectionFactory.newConnection(dockerRabbitMQCluster.getAddresses());
            this.resilientChannel = this.resilientConnection.createChannel();
            this.node2Connection = dockerRabbitMQCluster.getRabbitMQ2().connectionFactory().newConnection();
            this.node2Channel = this.node2Connection.createChannel();
        }

        @AfterEach
        void tearDown() {
            RabbitMQClusterTest.this.closeQuietly(this.resilientConnection, this.resilientChannel);
        }

        @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnectingSee https://github.com/rabbitmq/rabbitmq-server/issues/959")
        @Test
        void nodeKillingWhenProducing(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            this.resilientChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.DIRECT, true);
            this.resilientChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.resilientChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 20;
            int i2 = 20 / 2;
            IntStream.range(0, i2).mapToObj(i3 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i3));
            }).forEach(Throwing.consumer(bArr -> {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i2);
            });
            dockerRabbitMQCluster.getRabbitMQ1().stop();
            IntStream.range(i2, 20).mapToObj(i4 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i4));
            }).forEach(this::tryPublishWithRetry);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf((List) IntStream.range(0, 20).boxed().collect(Guavate.toImmutableList()));
        }

        private void tryPublishWithRetry(byte[] bArr) {
            Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> {
                return Boolean.valueOf(tryPublish(bArr));
            });
        }

        private boolean tryPublish(byte[] bArr) {
            try {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
                return true;
            } catch (Exception e) {
                RabbitMQClusterTest.LOGGER.error("failed publish", e);
                return false;
            }
        }

        @Test
        void connectingToAClusterWithAFailedRabbit(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            ConnectionFactory connectionFactory = dockerRabbitMQCluster.getRabbitMQ3().connectionFactory();
            dockerRabbitMQCluster.getRabbitMQ3().stop();
            Connection newConnection = connectionFactory.newConnection(dockerRabbitMQCluster.getAddresses());
            Throwable th = null;
            try {
                Channel createChannel = newConnection.createChannel();
                Throwable th2 = null;
                try {
                    try {
                        createChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.DIRECT, true);
                        createChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
                        createChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                        int i = 10;
                        IntStream.range(0, 10).mapToObj(i2 -> {
                            return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
                        }).forEach(Throwing.consumer(bArr -> {
                            createChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
                        }));
                        InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(createChannel);
                        createChannel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
                        RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                            return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
                        });
                        Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf((List) IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()));
                        if (createChannel != null) {
                            if (0 != 0) {
                                try {
                                    createChannel.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createChannel.close();
                            }
                        }
                        if (newConnection != null) {
                            if (0 == 0) {
                                newConnection.close();
                                return;
                            }
                            try {
                                newConnection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (createChannel != null) {
                        if (th2 != null) {
                            try {
                                createChannel.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            createChannel.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (newConnection != null) {
                    if (0 != 0) {
                        try {
                            newConnection.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        newConnection.close();
                    }
                }
                throw th8;
            }
        }

        @Test
        void nodeKillingWhenConsuming(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            this.node2Channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.DIRECT, true);
            this.node2Channel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.node2Channel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
            }));
            AtomicInteger atomicInteger = new AtomicInteger(0);
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.resilientChannel, () -> {
                stopWhenHalfProcessed(dockerRabbitMQCluster, i, atomicInteger);
            });
            this.resilientChannel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf((List) IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()));
        }

        private void stopWhenHalfProcessed(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster, int i, AtomicInteger atomicInteger) {
            if (atomicInteger.incrementAndGet() == i / 2) {
                dockerRabbitMQCluster.getRabbitMQ1().stop();
            }
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQClusterTest$ClusterSharing.class */
    class ClusterSharing {
        private ConnectionFactory node1ConnectionFactory;
        private ConnectionFactory node2ConnectionFactory;
        private Connection node1Connection;
        private Connection node2Connection;
        private Channel node1Channel;
        private Channel node2Channel;

        ClusterSharing() {
        }

        @BeforeEach
        void setup(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws IOException, TimeoutException {
            this.node1ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ1().connectionFactory();
            this.node2ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ2().connectionFactory();
            this.node1Connection = this.node1ConnectionFactory.newConnection();
            this.node2Connection = this.node2ConnectionFactory.newConnection();
            this.node1Channel = this.node1Connection.createChannel();
            this.node2Channel = this.node2Connection.createChannel();
        }

        @AfterEach
        void tearDown() {
            RabbitMQClusterTest.this.closeQuietly(this.node1Channel, this.node2Channel, this.node1Connection, this.node2Connection);
        }

        @Test
        void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            Assertions.assertThat(dockerRabbitMQCluster.getRabbitMQ1().container().execInContainer(new String[]{"rabbitmqctl", "cluster_status"}).getStdout()).contains(new CharSequence[]{DockerClusterRabbitMQExtension.RABBIT_1, DockerClusterRabbitMQExtension.RABBIT_2, DockerClusterRabbitMQExtension.RABBIT_3});
        }

        @Test
        void queuesShouldBeShared() throws Exception {
            this.node1Channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.DIRECT, true);
            this.node1Channel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.node1Channel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.node1Channel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf((List) IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()));
        }

        @Test
        void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
            this.node1Channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.DIRECT, true);
            this.node2Channel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.node2Channel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.node1Channel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, RabbitMQFixture.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf((List) IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()));
        }
    }

    RabbitMQClusterTest() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeQuietly(AutoCloseable... autoCloseableArr) {
        Arrays.stream(autoCloseableArr).forEach(this::closeQuietly);
    }

    private void closeQuietly(AutoCloseable autoCloseable) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] asBytes(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }
}
