package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.james.backend.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobsDAO;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.util.streams.Iterators;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.class */
class RabbitMQMailQueueConfigurationChangeTest {
    private static final String SPOOL = "spool";
    private UpdatableTickingClock clock;
    private RabbitMQMailQueueManagement mqManagementApi;
    private RabbitClient rabbitClient;
    private ThreadLocalRandom random;
    private MimeMessageStore.Factory mimeMessageStoreFactory;
    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
    private static final int THREE_BUCKET_COUNT = 3;
    private static final int UPDATE_BROWSE_START_PACE = 2;
    private static final CassandraMailQueueViewConfiguration DEFAULT_CONFIGURATION = CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(ONE_HOUR_SLICE_WINDOW).build();
    private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2L, (TemporalUnit) ChronoUnit.HOURS);

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(new CassandraModule[]{CassandraSchemaVersionModule.MODULE, CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE}));

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();

    RabbitMQMailQueueConfigurationChangeTest() {
    }

    @BeforeEach
    void setup(CassandraCluster cassandraCluster2) throws Exception {
        this.mimeMessageStoreFactory = MimeMessageStore.factory(new CassandraBlobsDAO(cassandraCluster2.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY));
        this.clock = new UpdatableTickingClock(IN_SLICE_1);
        this.random = ThreadLocalRandom.current();
        this.rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
        this.mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
    }

    @AfterEach
    void tearDown() {
        this.mqManagementApi.deleteAllQueues();
    }

    private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandraCluster2, CassandraMailQueueViewConfiguration cassandraMailQueueViewConfiguration) throws Exception {
        return new RabbitMQMailQueueFactory(this.rabbitClient, this.mqManagementApi, new RabbitMQMailQueueFactory.PrivateFactory(new NoopMetricFactory(), new NoopGaugeRegistry(), this.rabbitClient, this.mimeMessageStoreFactory, BLOB_ID_FACTORY, CassandraMailQueueViewTestFactory.factory(this.clock, this.random, cassandraCluster2.getConf(), cassandraCluster2.getTypesProvider(), cassandraMailQueueViewConfiguration, this.mimeMessageStoreFactory), this.clock, new RawMailQueueItemDecoratorFactory())).createQueue(SPOOL);
    }

    @Test
    void increasingBucketCountShouldAllowBrowsingAllQueueElements(CassandraCluster cassandraCluster2) throws Exception {
        enqueueSomeMails(getRabbitMQMailQueue(cassandraCluster2, DEFAULT_CONFIGURATION), namePatternForSlice(1), 10);
        RabbitMQMailQueue rabbitMQMailQueue = getRabbitMQMailQueue(cassandraCluster2, CassandraMailQueueViewConfiguration.builder().bucketCount(5).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(ONE_HOUR_SLICE_WINDOW).build());
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(UPDATE_BROWSE_START_PACE), 10);
        Assertions.assertThat(Iterators.toStream(rabbitMQMailQueue.browse()).map((v0) -> {
            return v0.getMail();
        }).map((v0) -> {
            return v0.getName();
        })).containsOnly(new String[]{"1-1", "1-2", "1-3", "1-4", "1-5", "1-6", "1-7", "1-8", "1-9", "1-10", "2-1", "2-2", "2-3", "2-4", "2-5", "2-6", "2-7", "2-8", "2-9", "2-10"});
    }

    @Test
    void decreasingBucketCountShouldBeRejected(CassandraCluster cassandraCluster2) throws Exception {
        getRabbitMQMailQueue(cassandraCluster2, CassandraMailQueueViewConfiguration.builder().bucketCount(5).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(ONE_HOUR_SLICE_WINDOW).build());
        Assertions.assertThatThrownBy(() -> {
            getRabbitMQMailQueue(cassandraCluster2, DEFAULT_CONFIGURATION);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void divideSliceWindowShouldAllowBrowsingAllQueueElements(CassandraCluster cassandraCluster2) throws Exception {
        RabbitMQMailQueue rabbitMQMailQueue = getRabbitMQMailQueue(cassandraCluster2, DEFAULT_CONFIGURATION);
        this.clock.setInstant(IN_SLICE_1);
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(1), 1);
        this.clock.setInstant(IN_SLICE_2);
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(UPDATE_BROWSE_START_PACE), 1);
        this.clock.setInstant(IN_SLICE_3);
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(THREE_BUCKET_COUNT), 1);
        RabbitMQMailQueue rabbitMQMailQueue2 = getRabbitMQMailQueue(cassandraCluster2, CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(Duration.ofMinutes(30L)).build());
        this.clock.setInstant(IN_SLICE_3.plus(35L, (TemporalUnit) ChronoUnit.MINUTES));
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(4), 1);
        this.clock.setInstant(IN_SLICE_3.plus(65L, (TemporalUnit) ChronoUnit.MINUTES));
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(5), 1);
        this.clock.setInstant(IN_SLICE_3.plus(95L, (TemporalUnit) ChronoUnit.MINUTES));
        enqueueSomeMails(rabbitMQMailQueue, namePatternForSlice(6), 1);
        Assertions.assertThat(Iterators.toStream(rabbitMQMailQueue2.browse()).map((v0) -> {
            return v0.getMail();
        }).map((v0) -> {
            return v0.getName();
        })).containsOnly(new String[]{"1-1", "2-1", "3-1", "4-1", "5-1", "6-1"});
    }

    @Test
    void decreaseArbitrarilySliceWindowShouldBeRejected(CassandraCluster cassandraCluster2) throws Exception {
        getRabbitMQMailQueue(cassandraCluster2, DEFAULT_CONFIGURATION);
        Assertions.assertThatThrownBy(() -> {
            getRabbitMQMailQueue(cassandraCluster2, CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(Duration.ofMinutes(25L)).build());
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void increaseSliceWindowShouldBeRejected(CassandraCluster cassandraCluster2) throws Exception {
        getRabbitMQMailQueue(cassandraCluster2, DEFAULT_CONFIGURATION);
        Assertions.assertThatThrownBy(() -> {
            getRabbitMQMailQueue(cassandraCluster2, CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(Duration.ofHours(2L)).build());
        }).isInstanceOf(IllegalArgumentException.class);
    }

    private Function<Integer, String> namePatternForSlice(int i) {
        return num -> {
            return i + "-" + num;
        };
    }

    private void enqueueSomeMails(MailQueue mailQueue, Function<Integer, String> function, int i) {
        IntStream.rangeClosed(1, i).forEach(Throwing.intConsumer(i2 -> {
            mailQueue.enQueue(Mails.defaultMail().name((String) function.apply(Integer.valueOf(i2))).build());
        }));
    }
}
