/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.queue.rabbitmq;

import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.github.fge.lambdas.Throwing;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.Scenario;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.metrics.api.Gauge;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueManagement;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.util.streams.Iterators;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;

class RabbitMQMailQueueTest {
    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
    private static final int THREE_BUCKET_COUNT = 3;
    private static final int UPDATE_BROWSE_START_PACE = 2;
    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1L);
    private static final org.apache.james.queue.api.MailQueueName SPOOL = org.apache.james.queue.api.MailQueueName.of((String)"spool");
    private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1L, ChronoUnit.HOURS);
    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2L, ChronoUnit.HOURS);
    private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4L, ChronoUnit.HOURS);
    private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6L, ChronoUnit.HOURS);
    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules((CassandraModule[])new CassandraModule[]{CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE(), CassandraSchemaVersionModule.MODULE}));
    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private RabbitMQMailQueueFactory mailQueueFactory;
    private UpdatableTickingClock clock;
    private RabbitMQMailQueue mailQueue;
    private RabbitMQMailQueueManagement mqManagementApi;

    RabbitMQMailQueueTest() {
    }

    @AfterEach
    void tearDown() {
        this.mqManagementApi.deleteAllQueues();
    }

    private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration, BlobStore blobStore) throws Exception {
        MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory((BlobStore)blobStore);
        this.clock = new UpdatableTickingClock(IN_SLICE_1);
        CassandraMailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory((Clock)this.clock, (Session)cassandra.getConf(), CassandraMailQueueViewConfiguration.builder().bucketCount(3).updateBrowseStartPace(2).sliceWindow(ONE_HOUR_SLICE_WINDOW).build(), mimeMessageStoreFactory);
        RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory((MetricFactory)metricTestSystem.getMetricFactory(), metricTestSystem.getSpyGaugeRegistry(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), mimeMessageStoreFactory, (BlobId.Factory)BLOB_ID_FACTORY, (MailQueueView.Factory)mailQueueViewFactory, (Clock)this.clock, (MailQueueItemDecoratorFactory)new RawMailQueueItemDecoratorFactory(), configuration);
        this.mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
        this.mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), this.mqManagementApi, factory);
        this.mailQueue = (RabbitMQMailQueue)this.mailQueueFactory.createQueue(SPOOL);
    }

    @Nested
    class DeDuplicationTest {
        @RegisterExtension
        MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();

        DeDuplicationTest() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandra, metricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting((Session)cassandra.getConf()).deduplication());
        }

        @Test
        void dequeueShouldStillRetrieveAllBlobsWhenIdenticalContentAndDeduplication() throws Exception {
            Flux dequeueFlux = Flux.from((Publisher)RabbitMQMailQueueTest.this.mailQueue.deQueue());
            String identicalContent = "identical content";
            String identicalSubject = "identical subject";
            RabbitMQMailQueueTest.this.mailQueue.enQueue((Mail)Mails.defaultMail().name("myMail1").mimeMessage(MimeMessageBuilder.mimeMessageBuilder().setSubject(identicalSubject).setText(identicalContent)).build());
            RabbitMQMailQueueTest.this.mailQueue.enQueue((Mail)Mails.defaultMail().name("myMail2").mimeMessage(MimeMessageBuilder.mimeMessageBuilder().setSubject(identicalSubject).setText(identicalContent)).build());
            List items = (List)dequeueFlux.take(2L).concatMap(mailQueueItem -> Mono.fromCallable(() -> {
                mailQueueItem.done(true);
                return mailQueueItem;
            })).collectList().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List)items).allSatisfy((Consumer)Throwing.consumer(item -> Assertions.assertThat((Object)item.getMail().getMessage().getContent()).isEqualTo((Object)identicalContent)));
        }
    }

    @Nested
    class MailQueueSizeMetricsDisabled {
        @RegisterExtension
        MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();

        MailQueueSizeMetricsDisabled() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandra, metricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(false).build(), CassandraBlobStoreFactory.forTesting((Session)cassandra.getConf()).passthrough());
        }

        @Test
        void constructorShouldNotRegisterGetQueueSizeGaugeWhenSizeMetricsDisabled(MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
            ArgumentCaptor gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
            ((GaugeRegistry)Mockito.verify((Object)metricTestSystem.getSpyGaugeRegistry(), (VerificationMode)Mockito.never())).register((String)ArgumentMatchers.any(), (Gauge)gaugeCaptor.capture());
        }
    }

    @Nested
    class MailQueueSizeMetricsEnabled
    implements ManageableMailQueueContract,
    MailQueueMetricContract {
        MailQueueSizeMetricsEnabled() {
        }

        @BeforeEach
        void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
            RabbitMQMailQueueTest.this.setUp(cassandra, metricTestSystem, RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build(), CassandraBlobStoreFactory.forTesting((Session)cassandra.getConf()).passthrough());
        }

        public void enQueue(Mail mail) throws MailQueue.MailQueueException {
            super.enQueue(mail);
            RabbitMQMailQueueTest.this.clock.tick();
        }

        public RabbitMQMailQueue getMailQueue() {
            return RabbitMQMailQueueTest.this.mailQueue;
        }

        public ManageableMailQueue getManageableMailQueue() {
            return RabbitMQMailQueueTest.this.mailQueue;
        }

        @Test
        void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
            ManageableMailQueue mailQueue = this.getManageableMailQueue();
            int emailCount = 5;
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_1);
            this.enqueueSomeMails(this.namePatternForSlice(1), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_2);
            this.enqueueSomeMails(this.namePatternForSlice(2), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_3);
            this.enqueueSomeMails(this.namePatternForSlice(3), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_5);
            this.enqueueSomeMails(this.namePatternForSlice(5), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_7);
            Stream<String> names = Iterators.toStream((Iterator)mailQueue.browse()).map(ManageableMailQueue.MailQueueItemView::getMail).map(Mail::getName);
            Assertions.assertThat(names).containsExactly((Object[])new String[]{"1-1", "1-2", "1-3", "1-4", "1-5", "2-1", "2-2", "2-3", "2-4", "2-5", "3-1", "3-2", "3-3", "3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
        }

        @Test
        void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            dequeueFlux.take(1L).flatMap(mailQueueItem -> Mono.fromCallable(() -> {
                mailQueueItem.done(true);
                return mailQueueItem;
            })).blockLast(Duration.ofSeconds(10L));
            Assertions.assertThat((Iterable)cassandra.getConf().execute((Statement)QueryBuilder.select().from("blobs"))).isEmpty();
        }

        @Test
        void clearShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getManageableMailQueue().clear();
            Assertions.assertThat((Iterable)cassandra.getConf().execute((Statement)QueryBuilder.select().from("blobs"))).isEmpty();
        }

        @Test
        void removeByNameShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getManageableMailQueue().remove(ManageableMailQueue.Type.Name, name1);
            Assertions.assertThat((Iterable)cassandra.getConf().execute((Statement)QueryBuilder.select().from("blobs"))).isEmpty();
        }

        @Test
        void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            this.getMailQueue().enQueue((Mail)Mails.defaultMailNoRecipient().name(name1).recipient(MailAddressFixture.RECIPIENT1).build());
            this.getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
            Assertions.assertThat((Iterable)cassandra.getConf().execute((Statement)QueryBuilder.select().from("blobs"))).isEmpty();
        }

        @Test
        void removeBySenderShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).sender(MailAddressFixture.SENDER).build());
            this.getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, MailAddressFixture.SENDER.asString());
            Assertions.assertThat((Iterable)cassandra.getConf().execute((Statement)QueryBuilder.select().from("blobs"))).isEmpty();
        }

        @Test
        void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
            ManageableMailQueue mailQueue = this.getManageableMailQueue();
            int emailCount = 5;
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_1);
            this.enqueueSomeMails(this.namePatternForSlice(1), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_2);
            this.enqueueSomeMails(this.namePatternForSlice(2), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_3);
            this.enqueueSomeMails(this.namePatternForSlice(3), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_5);
            this.enqueueSomeMails(this.namePatternForSlice(5), emailCount);
            RabbitMQMailQueueTest.this.clock.setInstant(IN_SLICE_7);
            this.dequeueMails(5);
            this.dequeueMails(5);
            this.dequeueMails(3);
            Stream<String> names = Iterators.toStream((Iterator)mailQueue.browse()).map(ManageableMailQueue.MailQueueItemView::getMail).map(Mail::getName);
            Assertions.assertThat(names).containsExactly((Object[])new String[]{"3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
        }

        private Function<Integer, String> namePatternForSlice(int sliceId) {
            return i -> sliceId + "-" + i;
        }

        @Test
        void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) {
            org.apache.james.queue.api.MailQueueName name = org.apache.james.queue.api.MailQueueName.of((String)"myQueue");
            RabbitMQMailQueueTest.this.mailQueueFactory.createQueue(name);
            boolean initialized = CassandraMailQueueViewTestFactory.isInitialized((Session)cassandra.getConf(), MailQueueName.fromString((String)name.asString()));
            Assertions.assertThat((boolean)initialized).isTrue();
        }

        @Test
        void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
            String negativehashedString = "this sting will have a negative hash";
            Assertions.assertThatCode(() -> this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(negativehashedString).build())).doesNotThrowAnyException();
        }

        @Disabled(value="JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable.The related test is disabled, and need to be re-enabled after investigation and a fix.")
        @Test
        public void concurrentEnqueueDequeueShouldNotFail() {
        }

        @Test
        void dequeueShouldWorkAfterNetworkOutage() throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            rabbitMQExtension.getRabbitMQ().pause();
            Thread.sleep(2000L);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            rabbitMQExtension.getRabbitMQ().unpause();
            Thread.sleep(100L);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            List items = (List)dequeueFlux.take(3L).collectList().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactly((Object[])new String[]{name1, name2, name3});
        }

        @Test
        void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception {
            String name = "myMail";
            rabbitMQExtension.getRabbitMQ().pause();
            Thread.sleep(2000L);
            try {
                this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name).build());
            }
            catch (Exception exception) {
                // empty catch block
            }
            rabbitMQExtension.getRabbitMQ().unpause();
            Thread.sleep(100L);
            this.getMailQueue().republishNotProcessedMails(RabbitMQMailQueueTest.this.clock.instant().plus(30L, ChronoUnit.MINUTES)).blockLast();
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            List items = (List)dequeueFlux.take(1L).collectList().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsOnly((Object[])new String[]{name});
        }

        @Test
        void messagesShouldSurviveRabbitMQRestart() throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            rabbitMQExtension.getRabbitMQ().restart();
            List items = (List)dequeueFlux.take(3L).collectList().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactly((Object[])new String[]{name1, name2, name3});
        }

        @Test
        void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus(Duration.ofHours(2L)));
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            Sender sender = rabbitMQExtension.getSender();
            this.suspendDequeuing(sender);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            this.resumeDequeuing(sender);
            Assertions.assertThat((List)((List)this.getMailQueue().republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1L))).collectList().block())).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
            List items = (List)dequeueFlux.take(Duration.ofSeconds(10L)).collectList().block();
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
        }

        @Test
        void onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus(Duration.ofHours(2L)));
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            Sender sender = rabbitMQExtension.getSender();
            this.suspendDequeuing(sender);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            this.resumeDequeuing(sender);
            Assertions.assertThat((List)((List)this.getMailQueue().republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1L))).collectList().block())).containsExactlyInAnyOrder((Object[])new String[]{name1, name2});
            List items = (List)dequeueFlux.take(Duration.ofSeconds(10L)).collectList().block();
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactlyInAnyOrder((Object[])new String[]{name1, name2});
        }

        @Test
        void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus(Duration.ofHours(2L)));
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            Sender sender = rabbitMQExtension.getSender();
            this.suspendDequeuing(sender);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            Assertions.assertThat((List)((List)this.getMailQueue().republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1L))).collectList().block())).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
            this.resumeDequeuing(sender);
            Assertions.assertThat((List)((List)this.getMailQueue().republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1L))).collectList().block())).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
            List items = (List)dequeueFlux.take(Duration.ofSeconds(10L)).collectList().block();
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
        }

        @Test
        void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost() throws Exception {
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now().minus(Duration.ofHours(2L)));
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            Flux dequeueFlux = Flux.from((Publisher)this.getMailQueue().deQueue());
            Sender sender = rabbitMQExtension.getSender();
            this.suspendDequeuing(sender);
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.resumeDequeuing(sender);
            RabbitMQMailQueueTest.this.clock.setInstant(Instant.now());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            Flux.merge((Publisher[])new Publisher[]{Mono.fromCallable(() -> {
                this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
                return true;
            }), Mono.fromRunnable(() -> Assertions.assertThat((List)((List)this.getMailQueue().republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1L))).collectList().block())).containsOnly((Object[])new String[]{name1}))}).then().block(Duration.ofSeconds(10L));
            List items = (List)dequeueFlux.take(Duration.ofSeconds(10L)).collectList().block();
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsExactlyInAnyOrder((Object[])new String[]{name1, name2, name3});
        }

        private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
            IntStream.rangeClosed(1, emailCount).forEach((IntConsumer)Throwing.intConsumer(i -> this.enQueue((Mail)Mails.defaultMail().name((String)namePattern.apply(i)).build())));
        }

        private void dequeueMails(int times) {
            Flux.from((Publisher)this.getManageableMailQueue().deQueue()).take((long)times).flatMap(mailQueueItem -> Mono.fromCallable(() -> {
                mailQueueItem.done(true);
                return mailQueueItem;
            })).blockLast();
        }

        @Test
        void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            cassandra.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")});
            List items = (List)Flux.from((Publisher)this.getMailQueue().deQueue()).take(3L).collectList().block(Duration.ofSeconds(10L));
            Assertions.assertThat((List)items).extracting(item -> item.getMail().getName()).containsOnly((Object[])new String[]{name1, name2, name3});
        }

        @Test
        void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            cassandra.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.returnEmpty().forever().whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")});
            ConcurrentLinkedDeque dequeuedNames = new ConcurrentLinkedDeque();
            Flux.from((Publisher)this.getMailQueue().deQueue()).take(3L).doOnNext(item -> dequeuedNames.add(item.getMail().getName())).doOnNext((Consumer)Throwing.consumer(item -> item.done(true))).subscribeOn(Schedulers.elastic()).subscribe();
            Thread.sleep(1000L);
            cassandra.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.executeNormally().forever().whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")});
            Thread.sleep(1000L);
            Assertions.assertThat(dequeuedNames).isEmpty();
        }

        @Test
        void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            String emptyRoutingKey = "";
            rabbitMQExtension.getSender().send((Publisher)Mono.just((Object)new OutboundMessage("JamesMailQueue-exchange-spool", emptyRoutingKey, "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            ConcurrentLinkedDeque dequeuedMailNames = new ConcurrentLinkedDeque();
            Flux.from((Publisher)this.getMailQueue().deQueue()).doOnNext(item -> dequeuedMailNames.add(item.getMail().getName())).doOnNext((Consumer)Throwing.consumer(item -> item.done(true))).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> Assertions.assertThat((Iterable)dequeuedMailNames).containsExactly((Object[])new String[]{name1, name2, name3}));
        }

        @Test
        void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
            String name1 = "myMail1";
            String name2 = "myMail2";
            String name3 = "myMail3";
            String emptyRoutingKey = "";
            IntStream.range(0, 100).forEach(i -> rabbitMQExtension.getSender().send((Publisher)Mono.just((Object)new OutboundMessage("JamesMailQueue-exchange-spool", emptyRoutingKey, ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8)))).block());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name1).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name2).build());
            this.getMailQueue().enQueue((Mail)Mails.defaultMail().name(name3).build());
            ConcurrentLinkedDeque dequeuedMailNames = new ConcurrentLinkedDeque();
            Flux.from((Publisher)this.getMailQueue().deQueue()).doOnNext(item -> dequeuedMailNames.add(item.getMail().getName())).doOnNext((Consumer)Throwing.consumer(item -> item.done(true))).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> Assertions.assertThat((Iterable)dequeuedMailNames).containsExactly((Object[])new String[]{name1, name2, name3}));
        }

        @Test
        void rejectedMessagesShouldBeDeadLettered() {
            String emptyRoutingKey = "";
            rabbitMQExtension.getSender().send((Publisher)Mono.just((Object)new OutboundMessage("JamesMailQueue-exchange-spool", emptyRoutingKey, "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
            AtomicInteger deadLetteredCount = new AtomicInteger();
            rabbitMQExtension.getRabbitChannelPool().createReceiver().consumeAutoAck("JamesMailQueue-dead-letter-queue-spool").doOnNext(next -> deadLetteredCount.incrementAndGet()).subscribeOn(Schedulers.elastic()).subscribe();
            Flux.from((Publisher)this.getMailQueue().deQueue()).doOnNext((Consumer)Throwing.consumer(item -> item.done(true))).subscribeOn(Schedulers.elastic()).subscribe();
            Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> Assertions.assertThat((int)deadLetteredCount.get()).isEqualTo(1));
        }

        private void resumeDequeuing(Sender sender) {
            sender.bindQueue(this.getMailQueueBindingSpecification()).block();
        }

        private void suspendDequeuing(Sender sender) {
            sender.unbindQueue(this.getMailQueueBindingSpecification()).block();
        }

        private BindingSpecification getMailQueueBindingSpecification() {
            MailQueueName mailQueueName = MailQueueName.fromString((String)this.getMailQueue().getName().asString());
            return BindingSpecification.binding().exchange(mailQueueName.toRabbitExchangeName().asString()).queue(mailQueueName.toWorkQueueName().asString()).routingKey("");
        }
    }
}

