package org.apache.james.queue.rabbitmq;

import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.github.fge.lambdas.Throwing;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.Scenario;
import org.apache.james.backends.cassandra.StatementRecorder;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.metrics.api.Gauge;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.util.streams.Iterators;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.apache.mailet.base.test.FakeMail;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.class */
public class RabbitMQMailQueueTest {
    private static final int THREE_BUCKET_COUNT = 3;
    private static final int UPDATE_BROWSE_START_PACE = 25;
    private RabbitMQMailQueueFactory mailQueueFactory;
    private UpdatableTickingClock clock;
    private RabbitMQMailQueue mailQueue;
    private RabbitMQMailQueueManagement mqManagementApi;
    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
    private static final MailQueueName SPOOL = MailQueueName.of("spool");
    private static final Instant IN_SLICE_1 = Instant.now().minus(60L, (TemporalUnit) ChronoUnit.DAYS);
    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6L, (TemporalUnit) ChronoUnit.HOURS);
    public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(new CassandraModule[]{CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE(), CassandraSchemaVersionModule.MODULE}));

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);

    @Nested
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest$DeDuplicationTest.class */
    class DeDuplicationTest {

        @RegisterExtension
        MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();

        DeDuplicationTest() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandraCluster, mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory()).deduplication(), MailQueueFactory.prefetchCount(RabbitMQMailQueueTest.THREE_BUCKET_COUNT));
        }

        @Test
        void dequeueShouldStillRetrieveAllBlobsWhenIdenticalContentAndDeduplication() throws Exception {
            String str = "identical content";
            RabbitMQMailQueueTest.this.mailQueue.enQueue(Mails.defaultMail().name("myMail1").mimeMessage(MimeMessageBuilder.mimeMessageBuilder().setSubject("identical subject").setText("identical content")).build());
            RabbitMQMailQueueTest.this.mailQueue.enQueue(Mails.defaultMail().name("myMail2").mimeMessage(MimeMessageBuilder.mimeMessageBuilder().setSubject("identical subject").setText("identical content")).build());
            Flux.from(RabbitMQMailQueueTest.this.mailQueue.deQueue()).take(2L).concatMap(mailQueueItem -> {
                return Mono.fromCallable(() -> {
                    Assertions.assertThat(mailQueueItem.getMail().getMessage().getContent()).isEqualTo(str);
                    mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                    return mailQueueItem;
                }).subscribeOn(Schedulers.fromExecutor(RabbitMQMailQueueTest.EXECUTOR));
            }).collectList().block(Duration.ofSeconds(10L));
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest$MailQueueSizeMetricsDisabled.class */
    class MailQueueSizeMetricsDisabled {

        @RegisterExtension
        MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();

        MailQueueSizeMetricsDisabled() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandraCluster, mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(false).build(), CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory()).passthrough(), MailQueueFactory.prefetchCount(RabbitMQMailQueueTest.THREE_BUCKET_COUNT));
        }

        @Test
        void constructorShouldNotRegisterGetQueueSizeGaugeWhenSizeMetricsDisabled(MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) {
            ((GaugeRegistry) Mockito.verify(mailQueueMetricTestSystem.getSpyGaugeRegistry(), Mockito.never())).register((String) ArgumentMatchers.any(), (Gauge) ArgumentCaptor.forClass(Gauge.class).capture());
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest$MailQueueSizeMetricsEnabled.class */
    class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract {
        MailQueueSizeMetricsEnabled() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandraCluster, mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory()).passthrough(), MailQueueFactory.prefetchCount(RabbitMQMailQueueTest.THREE_BUCKET_COUNT));
        }

        public void enQueue(Mail mail) throws MailQueue.MailQueueException {
            super.enQueue(mail);
            RabbitMQMailQueueTest.this.clock.tick();
        }

        /* renamed from: getMailQueue, reason: merged with bridge method [inline-methods] */
        public RabbitMQMailQueue m4getMailQueue() {
            return RabbitMQMailQueueTest.this.mailQueue;
        }

        public ManageableMailQueue getManageableMailQueue() {
            return RabbitMQMailQueueTest.this.mailQueue;
        }

        @Test
        void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
            ManageableMailQueue manageableMailQueue = getManageableMailQueue();
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_1);
            enqueueSomeMails(namePatternForSlice(1), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_2);
            enqueueSomeMails(namePatternForSlice(2), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_3);
            enqueueSomeMails(namePatternForSlice(RabbitMQMailQueueTest.THREE_BUCKET_COUNT), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_5);
            enqueueSomeMails(namePatternForSlice(5), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_7);
            Assertions.assertThat(Iterators.toStream(manageableMailQueue.browse()).map((v0) -> {
                return v0.getMail();
            }).map((v0) -> {
                return v0.getName();
            })).containsExactly(new String[]{"1-1", "1-2", "1-3", "1-4", "1-5", "2-1", "2-2", "2-3", "2-4", "2-5", "3-1", "3-2", "3-3", "3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
        }

        @Test
        void browseStartShouldBeUpdated(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandraCluster, mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory()).passthrough(), MailQueueFactory.prefetchCount(1));
            StatementRecorder.Selector preparedStatementStartingWith = StatementRecorder.Selector.preparedStatementStartingWith("UPDATE browsestart");
            StatementRecorder recordStatements = cassandraCluster.getConf().recordStatements(preparedStatementStartingWith);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_1);
            enqueueSomeMails(namePatternForSlice(1), 250);
            dequeueMails(250);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_2);
            enqueueSomeMails(namePatternForSlice(2), 250);
            dequeueMails(250);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_3);
            enqueueSomeMails(namePatternForSlice(RabbitMQMailQueueTest.THREE_BUCKET_COUNT), 250);
            dequeueMails(250);
            Assertions.assertThat(recordStatements.listExecutedStatements(preparedStatementStartingWith)).hasSizeBetween(2, 12);
        }

        @Test
        void contentStartShouldBeUpdated(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandraCluster, mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory()).passthrough(), MailQueueFactory.prefetchCount(1));
            StatementRecorder.Selector preparedStatementStartingWith = StatementRecorder.Selector.preparedStatementStartingWith("UPDATE contentstart");
            StatementRecorder recordStatements = cassandraCluster.getConf().recordStatements(preparedStatementStartingWith);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_1);
            enqueueSomeMails(namePatternForSlice(1), 250);
            dequeueMails(250);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_2);
            enqueueSomeMails(namePatternForSlice(2), 250);
            dequeueMails(250);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_3);
            enqueueSomeMails(namePatternForSlice(RabbitMQMailQueueTest.THREE_BUCKET_COUNT), 250);
            dequeueMails(250);
            Assertions.assertThat(recordStatements.listExecutedStatements(preparedStatementStartingWith)).hasSizeBetween(2, 12);
        }

        @Test
        void dequeueShouldDeleteBlobs(CassandraCluster cassandraCluster) throws Exception {
            Flux from = Flux.from(m4getMailQueue().deQueue());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            from.take(1L).flatMap(mailQueueItem -> {
                return Mono.fromCallable(() -> {
                    mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                    return mailQueueItem;
                }).subscribeOn(Schedulers.fromExecutor(EXECUTOR));
            }).blockLast(Duration.ofSeconds(10L));
            Assertions.assertThat(cassandraCluster.getConf().execute(QueryBuilder.selectFrom("blobs").all().build())).isEmpty();
        }

        @Test
        void clearShouldDeleteBlobs(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            getManageableMailQueue().clear();
            Assertions.assertThat(cassandraCluster.getConf().execute(QueryBuilder.selectFrom("blobs").all().build())).isEmpty();
        }

        @Test
        void removeByNameShouldDeleteBlobs(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "myMail1");
            Assertions.assertThat(cassandraCluster.getConf().execute(QueryBuilder.selectFrom("blobs").all().build())).isEmpty();
        }

        @Test
        void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMailNoRecipient().name("myMail1").recipient(MailAddressFixture.RECIPIENT1).build());
            getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
            Assertions.assertThat(cassandraCluster.getConf().execute(QueryBuilder.selectFrom("blobs").all().build())).isEmpty();
        }

        @Test
        void removeBySenderShouldDeleteBlobs(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").sender(MailAddressFixture.SENDER).build());
            getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, MailAddressFixture.SENDER.asString());
            Assertions.assertThat(cassandraCluster.getConf().execute(QueryBuilder.selectFrom("blobs").all().build())).isEmpty();
        }

        @Test
        void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
            ManageableMailQueue manageableMailQueue = getManageableMailQueue();
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_1);
            enqueueSomeMails(namePatternForSlice(1), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_2);
            enqueueSomeMails(namePatternForSlice(2), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_3);
            enqueueSomeMails(namePatternForSlice(RabbitMQMailQueueTest.THREE_BUCKET_COUNT), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_5);
            enqueueSomeMails(namePatternForSlice(5), 5);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_7);
            dequeueMails(5);
            dequeueMails(5);
            dequeueMails(RabbitMQMailQueueTest.THREE_BUCKET_COUNT);
            Assertions.assertThat(Iterators.toStream(manageableMailQueue.browse()).map((v0) -> {
                return v0.getMail();
            }).map((v0) -> {
                return v0.getName();
            })).containsExactly(new String[]{"3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
        }

        @Test
        void enqueuedEmailsShouldEventuallyBeCleaned() {
            ManageableMailQueue manageableMailQueue = getManageableMailQueue();
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_1);
            enqueueSomeMails(namePatternForSlice(1), 100);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_2);
            enqueueSomeMails(namePatternForSlice(2), 100);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_3);
            enqueueSomeMails(namePatternForSlice(RabbitMQMailQueueTest.THREE_BUCKET_COUNT), 100);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_5);
            enqueueSomeMails(namePatternForSlice(5), 100);
            RabbitMQMailQueueTest.this.clock.setInstant(RabbitMQMailQueueTest.IN_SLICE_7);
            dequeueMails(100);
            dequeueMails(100);
            dequeueMails(100);
            dequeueMails(100);
            EnqueuedMailsDAO enqueuedMailsDAO = new EnqueuedMailsDAO(RabbitMQMailQueueTest.cassandraCluster.getCassandraCluster().getConf(), new HashBlobId.Factory());
            MailQueueName fromString = MailQueueName.fromString(manageableMailQueue.getName().asString());
            BucketedSlices.Slice of = BucketedSlices.Slice.of(currentSliceStartInstant(RabbitMQMailQueueTest.IN_SLICE_1));
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat((List) enqueuedMailsDAO.selectEnqueuedMails(fromString, of, BucketedSlices.BucketId.of(0)).collectList().block()).isEmpty();
                softAssertions.assertThat((List) enqueuedMailsDAO.selectEnqueuedMails(fromString, of, BucketedSlices.BucketId.of(1)).collectList().block()).isEmpty();
                softAssertions.assertThat((List) enqueuedMailsDAO.selectEnqueuedMails(fromString, of, BucketedSlices.BucketId.of(2)).collectList().block()).isEmpty();
            });
        }

        private Instant currentSliceStartInstant(Instant instant) {
            long seconds = RabbitMQMailQueueTest.ONE_HOUR_SLICE_WINDOW.getSeconds();
            return Instant.ofEpochSecond((instant.getEpochSecond() / seconds) * seconds);
        }

        private Function<Integer, String> namePatternForSlice(int i) {
            return num -> {
                return i + "-" + num;
            };
        }

        @Test
        void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandraCluster) {
            MailQueueName of = MailQueueName.of("myQueue");
            RabbitMQMailQueueTest.this.mailQueueFactory.createQueue(of);
            Assertions.assertThat(CassandraMailQueueViewTestFactory.isInitialized(cassandraCluster.getConf(), MailQueueName.fromString(of.asString()))).isTrue();
        }

        @Test
        void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
            String str = "this sting will have a negative hash";
            Assertions.assertThatCode(() -> {
                m4getMailQueue().enQueue(Mails.defaultMail().name(str).build());
            }).doesNotThrowAnyException();
        }

        @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable.The related test is disabled, and need to be re-enabled after investigation and a fix.")
        @Test
        public void concurrentEnqueueDequeueShouldNotFail() {
        }

        @Test
        void dequeueShouldWorkAfterNetworkOutage() throws Exception {
            String str = "myMail1";
            String str2 = "myMail3";
            ArrayList arrayList = new ArrayList();
            Flux from = Flux.from(m4getMailQueue().deQueue());
            Objects.requireNonNull(arrayList);
            from.doOnNext((v1) -> {
                r1.add(v1);
            }).subscribe();
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().pause();
            Thread.sleep(2000L);
            try {
                try {
                    m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
                    RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().unpause();
                    Thread.sleep(100L);
                } catch (Exception e) {
                    RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().unpause();
                    Thread.sleep(100L);
                }
                m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
                Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
                    Assertions.assertThat(arrayList).extracting(mailQueueItem -> {
                        return mailQueueItem.getMail().getName();
                    }).contains(new String[]{str, str2});
                });
            } catch (Throwable th) {
                RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().unpause();
                Thread.sleep(100L);
                throw th;
            }
        }

        @Test
        void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception {
            try {
                m4getMailQueue().enQueue(Mails.defaultMail().name("myMail").build());
            } catch (Exception e) {
            }
            RabbitMQMailQueueTest.rabbitMQExtension.managementAPI().purgeQueue((String) RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().getConfiguration().getVhost().orElse("/"), "JamesMailQueue-workqueue-spool");
            m4getMailQueue().republishNotProcessedMails(RabbitMQMailQueueTest.this.clock.instant().plus(30L, (TemporalUnit) ChronoUnit.MINUTES)).blockLast();
            Assertions.assertThat((List) Flux.from(m4getMailQueue().deQueue()).take(1L).collectList().block(Duration.ofSeconds(10L))).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsOnly(new String[]{"myMail"});
        }

        @Test
        void messagesShouldSurviveRabbitMQRestart() throws Exception {
            Flux from = Flux.from(m4getMailQueue().deQueue());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            RabbitMQMailQueueTest.rabbitMQExtension.getRabbitMQ().restart();
            Assertions.assertThat((List) from.take(3L).collectList().block(Duration.ofSeconds(10L))).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsExactly(new String[]{"myMail1", "myMail2", "myMail3"});
        }

        @Test
        void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus((TemporalAmount) Duration.ofHours(2L)));
            Flux from = Flux.from(m4getMailQueue().deQueue());
            Sender sender = RabbitMQMailQueueTest.rabbitMQExtension.getSender();
            suspendDequeuing(sender);
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            resumeDequeuing(sender);
            Assertions.assertThat((List) m4getMailQueue().republishNotProcessedMails(Instant.now().minus((TemporalAmount) Duration.ofHours(1L))).collectList().block()).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
            Assertions.assertThat((List) from.take(Duration.ofSeconds(10L)).collectList().block()).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
        }

        @Test
        void onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus((TemporalAmount) Duration.ofHours(2L)));
            Flux from = Flux.from(m4getMailQueue().deQueue());
            Sender sender = RabbitMQMailQueueTest.rabbitMQExtension.getSender();
            suspendDequeuing(sender);
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            resumeDequeuing(sender);
            Assertions.assertThat((List) m4getMailQueue().republishNotProcessedMails(Instant.now().minus((TemporalAmount) Duration.ofHours(1L))).collectList().block()).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2"});
            Assertions.assertThat((List) from.take(Duration.ofSeconds(10L)).collectList().block()).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2"});
        }

        @Test
        void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus((TemporalAmount) Duration.ofHours(2L)));
            Flux from = Flux.from(m4getMailQueue().deQueue());
            Sender sender = RabbitMQMailQueueTest.rabbitMQExtension.getSender();
            suspendDequeuing(sender);
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            Assertions.assertThat((List) m4getMailQueue().republishNotProcessedMails(Instant.now().minus((TemporalAmount) Duration.ofHours(1L))).collectList().block()).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
            resumeDequeuing(sender);
            Assertions.assertThat((List) m4getMailQueue().republishNotProcessedMails(Instant.now().minus((TemporalAmount) Duration.ofHours(1L))).collectList().block()).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
            Assertions.assertThat((List) from.take(Duration.ofSeconds(10L)).collectList().block()).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
        }

        @Test
        void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus((TemporalAmount) Duration.ofHours(2L)));
            String str = "myMail1";
            String str2 = "myMail2";
            Flux from = Flux.from(m4getMailQueue().deQueue());
            Sender sender = RabbitMQMailQueueTest.rabbitMQExtension.getSender();
            suspendDequeuing(sender);
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            resumeDequeuing(sender);
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            Flux.merge(new Publisher[]{Mono.fromCallable(() -> {
                m4getMailQueue().enQueue(Mails.defaultMail().name(str2).build());
                return true;
            }).subscribeOn(Schedulers.fromExecutor(EXECUTOR)), Mono.fromRunnable(() -> {
                Assertions.assertThat((List) m4getMailQueue().republishNotProcessedMails(Instant.now().minus((TemporalAmount) Duration.ofHours(1L))).collectList().block()).containsOnly(new String[]{str});
            }).subscribeOn(Schedulers.fromExecutor(EXECUTOR))}).then().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List) from.take(Duration.ofSeconds(10L)).collectList().block()).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsExactlyInAnyOrder(new String[]{"myMail1", "myMail2", "myMail3"});
        }

        private void enqueueSomeMails(Function<Integer, String> function, int i) {
            IntStream.rangeClosed(1, i).forEach(Throwing.intConsumer(i2 -> {
                FakeMail build = Mails.defaultMail().name((String) function.apply(Integer.valueOf(i2))).build();
                enQueue(build);
                LifecycleUtil.dispose(build);
            }));
        }

        private void dequeueMails(int i) {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Disposable subscribe = Flux.from(getManageableMailQueue().deQueue()).concatMap(mailQueueItem -> {
                return Mono.fromCallable(() -> {
                    if (atomicInteger.getAndIncrement() < i) {
                        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                        return mailQueueItem;
                    }
                    mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
                    return null;
                }).subscribeOn(Schedulers.fromExecutor(EXECUTOR));
            }).subscribe();
            try {
                Awaitility.await().atMost(Duration.ofMinutes(10L)).untilAsserted(() -> {
                    Assertions.assertThat(atomicInteger.get()).isGreaterThanOrEqualTo(i);
                });
                subscribe.dispose();
            } catch (Throwable th) {
                subscribe.dispose();
                throw th;
            }
        }

        @Test
        void dequeueShouldRetryLoadingErrors(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            cassandraCluster.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")});
            Assertions.assertThat((List) Flux.from(m4getMailQueue().deQueue()).take(3L).collectList().block(Duration.ofSeconds(10L))).extracting(mailQueueItem -> {
                return mailQueueItem.getMail().getName();
            }).containsOnly(new String[]{"myMail1", "myMail2", "myMail3"});
        }

        @Test
        void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandraCluster) throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            cassandraCluster.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.returnEmpty().forever().whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id")});
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            Flux.from(m4getMailQueue().deQueue()).take(3L).doOnNext(mailQueueItem -> {
                concurrentLinkedDeque.add(mailQueueItem.getMail().getName());
            }).flatMap(mailQueueItem2 -> {
                return Mono.fromRunnable(Throwing.runnable(() -> {
                    mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                })).subscribeOn(Schedulers.fromExecutor(EXECUTOR));
            }).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).subscribe();
            Thread.sleep(1000L);
            cassandraCluster.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.executeNormally().forever().whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id")});
            Thread.sleep(1000L);
            Assertions.assertThat(concurrentLinkedDeque).isEmpty();
        }

        @Test
        void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception {
            String str = "myMail1";
            String str2 = "myMail2";
            String str3 = "myMail3";
            RabbitMQMailQueueTest.rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", "", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            Flux.from(m4getMailQueue().deQueue()).doOnNext(mailQueueItem -> {
                concurrentLinkedDeque.add(mailQueueItem.getMail().getName());
            }).flatMap(mailQueueItem2 -> {
                return Mono.fromRunnable(Throwing.runnable(() -> {
                    mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                })).subscribeOn(Schedulers.fromExecutor(EXECUTOR));
            }).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
                Assertions.assertThat(concurrentLinkedDeque).containsExactly(new String[]{str, str2, str3});
            });
        }

        @Test
        void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
            String str = "myMail1";
            String str2 = "myMail2";
            String str3 = "myMail3";
            String str4 = "";
            IntStream.range(0, 100).forEach(i -> {
                RabbitMQMailQueueTest.rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", str4, ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8)))).block();
            });
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail2").build());
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail3").build());
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            Flux.from(m4getMailQueue().deQueue()).doOnNext(mailQueueItem -> {
                concurrentLinkedDeque.add(mailQueueItem.getMail().getName());
            }).flatMap(mailQueueItem2 -> {
                return Mono.fromRunnable(Throwing.runnable(() -> {
                    mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                })).subscribeOn(Schedulers.fromExecutor(EXECUTOR));
            }).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
                Assertions.assertThat(concurrentLinkedDeque).containsExactly(new String[]{str, str2, str3});
            });
        }

        @Test
        void invalidMessagesShouldBeDeadLettered() {
            RabbitMQMailQueueTest.rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", "", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
            AtomicInteger atomicInteger = new AtomicInteger();
            RabbitMQMailQueueTest.rabbitMQExtension.getRabbitChannelPool().createReceiver().consumeAutoAck("JamesMailQueue-dead-letter-queue-spool").doOnNext(delivery -> {
                atomicInteger.incrementAndGet();
            }).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).subscribe();
            Flux.from(m4getMailQueue().deQueue()).doOnNext(Throwing.consumer(mailQueueItem -> {
                mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
            })).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
            });
        }

        @Test
        void rejectedMessagesShouldBeDeadLettered() throws Exception {
            m4getMailQueue().enQueue(Mails.defaultMail().name("myMail1").build());
            m4getMailQueue().deQueue().flatMap(mailQueueItem -> {
                return Mono.fromRunnable(Throwing.runnable(() -> {
                    mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.REJECT);
                })).subscribeOn(Schedulers.boundedElastic()).thenReturn(mailQueueItem);
            }).blockFirst();
            AtomicInteger atomicInteger = new AtomicInteger();
            RabbitMQMailQueueTest.rabbitMQExtension.getRabbitChannelPool().createReceiver().consumeAutoAck("JamesMailQueue-dead-letter-queue-spool").doOnNext(delivery -> {
                atomicInteger.incrementAndGet();
            }).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
            });
        }

        private void resumeDequeuing(Sender sender) {
            sender.bindQueue(getMailQueueBindingSpecification()).block();
        }

        private void suspendDequeuing(Sender sender) {
            sender.unbindQueue(getMailQueueBindingSpecification()).block();
        }

        private BindingSpecification getMailQueueBindingSpecification() {
            MailQueueName fromString = MailQueueName.fromString(m4getMailQueue().getName().asString());
            return BindingSpecification.binding().exchange(fromString.toRabbitExchangeName().asString()).queue(fromString.toWorkQueueName().asString()).routingKey("");
        }
    }

    RabbitMQMailQueueTest() {
    }

    @AfterEach
    void tearDown() {
        this.mqManagementApi.deleteAllQueues();
    }

    private void setUp(CassandraCluster cassandraCluster2, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem, RabbitMQMailQueueConfiguration rabbitMQMailQueueConfiguration, BlobStore blobStore, MailQueueFactory.PrefetchCount prefetchCount) throws Exception {
        MimeMessageStore.Factory factory = MimeMessageStore.factory(blobStore);
        this.clock = new UpdatableTickingClock(IN_SLICE_1);
        RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(mailQueueMetricTestSystem.getMetricFactory(), mailQueueMetricTestSystem.getSpyGaugeRegistry(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), factory, BLOB_ID_FACTORY, CassandraMailQueueViewTestFactory.factory(this.clock, cassandraCluster2.getConf(), CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(ONE_HOUR_SLICE_WINDOW).build(), factory), this.clock, new RawMailQueueItemDecoratorFactory(), rabbitMQMailQueueConfiguration);
        this.mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
        this.mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), this.mqManagementApi, privateFactory, rabbitMQExtension.getRabbitMQ().getConfiguration());
        this.mailQueue = this.mailQueueFactory.createQueue(SPOOL, prefetchCount);
    }
}
