package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
import org.apache.james.backend.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobsDAO;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.util.streams.Iterators;
import org.apache.mailet.Mail;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.class */
public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract {
    private static final int THREE_BUCKET_COUNT = 3;
    private static final int UPDATE_BROWSE_START_PACE = 2;
    private static final String SPOOL = "spool";
    private RabbitMQMailQueueFactory mailQueueFactory;
    private UpdatableTickingClock clock;
    private RabbitMQMailQueue mailQueue;
    private RabbitMQMailQueueManagement mqManagementApi;
    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
    private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4L, (TemporalUnit) ChronoUnit.HOURS);
    private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6L, (TemporalUnit) ChronoUnit.HOURS);

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(new CassandraModule[]{CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE, CassandraSchemaVersionModule.MODULE}));

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();

    public void enQueue(Mail mail) throws MailQueue.MailQueueException {
        super.enQueue(mail);
        this.clock.tick();
    }

    @BeforeEach
    void setup(DockerRabbitMQ dockerRabbitMQ, CassandraCluster cassandraCluster2, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) throws Exception {
        MimeMessageStore.Factory factory = MimeMessageStore.factory(new CassandraBlobsDAO(cassandraCluster2.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY));
        this.clock = new UpdatableTickingClock(IN_SLICE_1);
        CassandraMailQueueView.Factory factory2 = CassandraMailQueueViewTestFactory.factory(this.clock, ThreadLocalRandom.current(), cassandraCluster2.getConf(), cassandraCluster2.getTypesProvider(), CassandraMailQueueViewConfiguration.builder().bucketCount(THREE_BUCKET_COUNT).updateBrowseStartPace(UPDATE_BROWSE_START_PACE).sliceWindow(ONE_HOUR_SLICE_WINDOW).build(), factory);
        RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
        RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(mailQueueMetricTestSystem.getSpyMetricFactory(), mailQueueMetricTestSystem.getSpyGaugeRegistry(), rabbitClient, factory, BLOB_ID_FACTORY, factory2, this.clock, new RawMailQueueItemDecoratorFactory());
        this.mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
        this.mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, this.mqManagementApi, privateFactory);
        this.mailQueue = this.mailQueueFactory.createQueue(SPOOL);
    }

    @AfterEach
    void tearDown() {
        this.mqManagementApi.deleteAllQueues();
    }

    public MailQueue getMailQueue() {
        return this.mailQueue;
    }

    public ManageableMailQueue getManageableMailQueue() {
        return this.mailQueue;
    }

    @Test
    void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
        ManageableMailQueue manageableMailQueue = getManageableMailQueue();
        this.clock.setInstant(IN_SLICE_1);
        enqueueSomeMails(namePatternForSlice(1), 5);
        this.clock.setInstant(IN_SLICE_2);
        enqueueSomeMails(namePatternForSlice(UPDATE_BROWSE_START_PACE), 5);
        this.clock.setInstant(IN_SLICE_3);
        enqueueSomeMails(namePatternForSlice(THREE_BUCKET_COUNT), 5);
        this.clock.setInstant(IN_SLICE_5);
        enqueueSomeMails(namePatternForSlice(5), 5);
        this.clock.setInstant(IN_SLICE_7);
        Assertions.assertThat(Iterators.toStream(manageableMailQueue.browse()).map((v0) -> {
            return v0.getMail();
        }).map((v0) -> {
            return v0.getName();
        })).containsExactly(new String[]{"1-1", "1-2", "1-3", "1-4", "1-5", "2-1", "2-2", "2-3", "2-4", "2-5", "3-1", "3-2", "3-3", "3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
    }

    @Test
    void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
        ManageableMailQueue manageableMailQueue = getManageableMailQueue();
        this.clock.setInstant(IN_SLICE_1);
        enqueueSomeMails(namePatternForSlice(1), 5);
        this.clock.setInstant(IN_SLICE_2);
        enqueueSomeMails(namePatternForSlice(UPDATE_BROWSE_START_PACE), 5);
        this.clock.setInstant(IN_SLICE_3);
        enqueueSomeMails(namePatternForSlice(THREE_BUCKET_COUNT), 5);
        this.clock.setInstant(IN_SLICE_5);
        enqueueSomeMails(namePatternForSlice(5), 5);
        this.clock.setInstant(IN_SLICE_7);
        dequeueMails(5);
        dequeueMails(5);
        dequeueMails(THREE_BUCKET_COUNT);
        Assertions.assertThat(Iterators.toStream(manageableMailQueue.browse()).map((v0) -> {
            return v0.getMail();
        }).map((v0) -> {
            return v0.getName();
        })).containsExactly(new String[]{"3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"});
    }

    private Function<Integer, String> namePatternForSlice(int i) {
        return num -> {
            return i + "-" + num;
        };
    }

    @Test
    void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandraCluster2) {
        this.mailQueueFactory.createQueue("myQueue");
        Assertions.assertThat(CassandraMailQueueViewTestFactory.isInitialized(cassandraCluster2.getConf(), MailQueueName.fromString("myQueue"))).isTrue();
    }

    @Test
    void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
        String str = "this sting will have a negative hash";
        Assertions.assertThatCode(() -> {
            getMailQueue().enQueue(Mails.defaultMail().name(str).build());
        }).doesNotThrowAnyException();
    }

    @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable.The related test is disabled, and need to be re-enabled after investigation and a fix.")
    @Test
    public void concurrentEnqueueDequeueShouldNotFail() {
    }

    private void enqueueSomeMails(Function<Integer, String> function, int i) {
        IntStream.rangeClosed(1, i).forEach(Throwing.intConsumer(i2 -> {
            enQueue(Mails.defaultMail().name((String) function.apply(Integer.valueOf(i2))).build());
        }));
    }

    private void dequeueMails(int i) {
        ManageableMailQueue manageableMailQueue = getManageableMailQueue();
        IntStream.rangeClosed(1, i).forEach(Throwing.intConsumer(i2 -> {
            manageableMailQueue.deQueue().done(true);
        }));
    }
}
