/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.queue.rabbitmq;

import com.datastax.driver.core.Session;
import com.github.fge.lambdas.Throwing;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.lifecycle.api.StartUpCheck;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueManagement;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewStartUpCheck;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
import org.apache.james.util.streams.Iterators;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.mailet.Mail;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class RabbitMQMailQueueConfigurationChangeTest {
    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
    private static final int THREE_BUCKET_COUNT = 3;
    private static final int UPDATE_BROWSE_START_PACE = 2;
    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1L);
    private static final CassandraMailQueueViewConfiguration DEFAULT_CONFIGURATION = CassandraMailQueueViewConfiguration.builder().bucketCount(3).updateBrowseStartPace(2).sliceWindow(ONE_HOUR_SLICE_WINDOW).build();
    private static final MailQueueName SPOOL = MailQueueName.of((String)"spool");
    private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1L, ChronoUnit.HOURS);
    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2L, ChronoUnit.HOURS);
    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules((CassandraModule[])new CassandraModule[]{CassandraSchemaVersionModule.MODULE, CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE()}));
    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private UpdatableTickingClock clock;
    private RabbitMQMailQueueManagement mqManagementApi;
    private MimeMessageStore.Factory mimeMessageStoreFactory;

    RabbitMQMailQueueConfigurationChangeTest() {
    }

    @BeforeEach
    void setup(CassandraCluster cassandra) throws Exception {
        BlobStore blobStore = CassandraBlobStoreFactory.forTesting((Session)cassandra.getConf()).passthrough();
        this.mimeMessageStoreFactory = MimeMessageStore.factory((BlobStore)blobStore);
        this.clock = new UpdatableTickingClock(IN_SLICE_1);
        this.mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
    }

    @AfterEach
    void tearDown() {
        this.mqManagementApi.deleteAllQueues();
    }

    private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) {
        CassandraMailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory((Clock)this.clock, (Session)cassandra.getConf(), mailQueueViewConfiguration, this.mimeMessageStoreFactory);
        RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder().sizeMetricsEnabled(true).build();
        RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory((MetricFactory)new RecordingMetricFactory(), (GaugeRegistry)new NoopGaugeRegistry(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.mimeMessageStoreFactory, (BlobId.Factory)BLOB_ID_FACTORY, (MailQueueView.Factory)mailQueueViewFactory, (Clock)this.clock, (MailQueueItemDecoratorFactory)new RawMailQueueItemDecoratorFactory(), mailQueueSizeConfiguration);
        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), this.mqManagementApi, privateFactory);
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), mailQueueViewConfiguration)).isEqualTo((Object)StartUpCheck.ResultType.GOOD);
        return (RabbitMQMailQueue)mailQueueFactory.createQueue(SPOOL);
    }

    private StartUpCheck.ResultType performStartUpCheck(Session session, CassandraMailQueueViewConfiguration configuration) {
        EventStoreDao eventStoreDao = new EventStoreDao(session, JsonEventSerializer.forModules((EventDTOModule[])new EventDTOModule[]{CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION}).withoutNestedType(), cassandraCluster.getCassandraConsistenciesConfiguration());
        EventsourcingConfigurationManagement eventsourcingConfigurationManagement = new EventsourcingConfigurationManagement((EventStore)new CassandraEventStore(eventStoreDao));
        CassandraMailQueueViewStartUpCheck check = new CassandraMailQueueViewStartUpCheck(eventsourcingConfigurationManagement, configuration);
        return check.check().getResultType();
    }

    @Test
    void increasingBucketCountShouldAllowBrowsingAllQueueElements(CassandraCluster cassandra) {
        RabbitMQMailQueue mailQueue = this.getRabbitMQMailQueue(cassandra, DEFAULT_CONFIGURATION);
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(1), 10);
        RabbitMQMailQueue mailQueueWithMoreBuckets = this.getRabbitMQMailQueue(cassandra, CassandraMailQueueViewConfiguration.builder().bucketCount(5).updateBrowseStartPace(2).sliceWindow(ONE_HOUR_SLICE_WINDOW).build());
        this.enqueueSomeMails((MailQueue)mailQueueWithMoreBuckets, this.namePatternForSlice(2), 10);
        Stream<String> names = Iterators.toStream((Iterator)mailQueueWithMoreBuckets.browse()).map(ManageableMailQueue.MailQueueItemView::getMail).map(Mail::getName);
        Assertions.assertThat(names).containsOnly((Object[])new String[]{"1-1", "1-2", "1-3", "1-4", "1-5", "1-6", "1-7", "1-8", "1-9", "1-10", "2-1", "2-2", "2-3", "2-4", "2-5", "2-6", "2-7", "2-8", "2-9", "2-10"});
    }

    @Test
    void decreasingBucketCountShouldBeRejected(CassandraCluster cassandra) {
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), CassandraMailQueueViewConfiguration.builder().bucketCount(5).updateBrowseStartPace(2).sliceWindow(ONE_HOUR_SLICE_WINDOW).build())).isEqualTo((Object)StartUpCheck.ResultType.GOOD);
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), DEFAULT_CONFIGURATION)).isEqualTo((Object)StartUpCheck.ResultType.BAD);
    }

    @Test
    void divideSliceWindowShouldAllowBrowsingAllQueueElements(CassandraCluster cassandra) {
        RabbitMQMailQueue mailQueue = this.getRabbitMQMailQueue(cassandra, DEFAULT_CONFIGURATION);
        this.clock.setInstant(IN_SLICE_1);
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(1), 1);
        this.clock.setInstant(IN_SLICE_2);
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(2), 1);
        this.clock.setInstant(IN_SLICE_3);
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(3), 1);
        RabbitMQMailQueue mailQueueWithSmallerSlices = this.getRabbitMQMailQueue(cassandra, CassandraMailQueueViewConfiguration.builder().bucketCount(3).updateBrowseStartPace(2).sliceWindow(Duration.ofMinutes(30L)).build());
        this.clock.setInstant(IN_SLICE_3.plus(35L, ChronoUnit.MINUTES));
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(4), 1);
        this.clock.setInstant(IN_SLICE_3.plus(65L, ChronoUnit.MINUTES));
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(5), 1);
        this.clock.setInstant(IN_SLICE_3.plus(95L, ChronoUnit.MINUTES));
        this.enqueueSomeMails((MailQueue)mailQueue, this.namePatternForSlice(6), 1);
        Stream<String> names = Iterators.toStream((Iterator)mailQueueWithSmallerSlices.browse()).map(ManageableMailQueue.MailQueueItemView::getMail).map(Mail::getName);
        Assertions.assertThat(names).containsOnly((Object[])new String[]{"1-1", "2-1", "3-1", "4-1", "5-1", "6-1"});
    }

    @Test
    void decreaseArbitrarilySliceWindowShouldBeRejected(CassandraCluster cassandra) {
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), DEFAULT_CONFIGURATION)).isEqualTo((Object)StartUpCheck.ResultType.GOOD);
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), CassandraMailQueueViewConfiguration.builder().bucketCount(3).updateBrowseStartPace(2).sliceWindow(Duration.ofMinutes(25L)).build())).isEqualTo((Object)StartUpCheck.ResultType.BAD);
    }

    @Test
    void increaseSliceWindowShouldBeRejected(CassandraCluster cassandra) {
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), DEFAULT_CONFIGURATION)).isEqualTo((Object)StartUpCheck.ResultType.GOOD);
        Assertions.assertThat((Comparable)this.performStartUpCheck((Session)cassandra.getConf(), CassandraMailQueueViewConfiguration.builder().bucketCount(3).updateBrowseStartPace(2).sliceWindow(Duration.ofHours(2L)).build())).isEqualTo((Object)StartUpCheck.ResultType.BAD);
    }

    private Function<Integer, String> namePatternForSlice(int sliceId) {
        return i -> sliceId + "-" + i;
    }

    private void enqueueSomeMails(MailQueue mailQueue, Function<Integer, String> namePattern, int emailCount) {
        IntStream.rangeClosed(1, emailCount).forEach((IntConsumer)Throwing.intConsumer(i -> mailQueue.enQueue((Mail)Mails.defaultMail().name((String)namePattern.apply(i)).build())));
    }
}

