package org.apache.james.queue.api;

import com.github.fge.lambdas.Throwing;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import java.io.Serializable;
import java.time.Duration;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.mail.internet.MimeMessage;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.util.MimeMessageUtil;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.apache.mailet.Attribute;
import org.apache.mailet.DsnParameters;
import org.apache.mailet.Mail;
import org.apache.mailet.PerRecipientHeaders;
import org.apache.mailet.base.MailAddressFixture;
import org.apache.mailet.base.test.FakeMail;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/queue/api/MailQueueContract.class */
public interface MailQueueContract {
    public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
    public static final Scheduler SCHEDULER = Schedulers.fromExecutor(EXECUTOR);

    /* loaded from: input_file:org/apache/james/queue/api/MailQueueContract$SerializableAttribute.class */
    public static class SerializableAttribute implements Serializable {
        private final String value;

        SerializableAttribute(String str) {
            this.value = str;
        }

        public final boolean equals(Object obj) {
            if (obj instanceof SerializableAttribute) {
                return Objects.equals(this.value, ((SerializableAttribute) obj).value);
            }
            return false;
        }

        public final int hashCode() {
            return Objects.hash(this.value);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("value", this.value).toString();
        }
    }

    MailQueue getMailQueue();

    default int getMailQueueMaxConcurrency() {
        return Integer.MAX_VALUE;
    }

    default void enQueue(Mail mail) throws MailQueue.MailQueueException {
        getMailQueue().enQueue(mail);
    }

    @Test
    default void queueShouldPreserveDsnParameters() throws Exception {
        DsnParameters dsnParameters = (DsnParameters) DsnParameters.builder().envId(DsnParameters.EnvId.of("434554-55445-33443")).ret(DsnParameters.Ret.FULL).addRcptParameter(new MailAddress("bob@apache.org"), DsnParameters.RecipientDsnParameters.of(new MailAddress("andy@apache.org"))).addRcptParameter(new MailAddress("cedric@apache.org"), DsnParameters.RecipientDsnParameters.of(EnumSet.of(DsnParameters.Notify.SUCCESS))).addRcptParameter(new MailAddress("domi@apache.org"), DsnParameters.RecipientDsnParameters.of(EnumSet.of(DsnParameters.Notify.FAILURE), new MailAddress("eric@apache.org"))).build().get();
        FakeMail build = Mails.defaultMail().name("mail").build();
        build.setDsnParameters(dsnParameters);
        enQueue(build);
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().dsnParameters()).contains(dsnParameters);
    }

    @Test
    default void queueShouldSupportBigMail() throws Exception {
        enQueue(Mails.defaultMail().name("name1").mimeMessage(MimeMessageBuilder.mimeMessageBuilder().setText(Strings.repeat("0123456789\r\n", 1048576))).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getName()).isEqualTo("name1");
    }

    @Test
    default void queueShouldPreserveMailRecipients() throws Exception {
        enQueue(Mails.defaultMail().name("mail").recipients(new MailAddress[]{MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2}).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getRecipients()).containsOnly(new MailAddress[]{MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2});
    }

    @Test
    default void queueShouldHandleSender() throws Exception {
        enQueue(FakeMail.builder().name("name").mimeMessage(Mails.createMimeMessage()).recipients(new MailAddress[]{MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2}).sender(MailAddress.nullSender()).lastUpdated(new Date()).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getMaybeSender()).isEqualTo(MaybeSender.nullSender());
    }

    @Test
    default void enQueueShouldAcceptMailWithDuplicatedNames() throws Exception {
        FakeMail build = FakeMail.builder().name("name").mimeMessage(Mails.createMimeMessage()).recipients(new MailAddress[]{MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2}).sender(MailAddress.nullSender()).lastUpdated(new Date()).build();
        enQueue(build);
        enQueue(build);
        Assertions.assertThat(Flux.from(getMailQueue().deQueue()).take(2L).map((v0) -> {
            return v0.getMail();
        }).map((v0) -> {
            return v0.getName();
        }).toStream()).hasSize(2).containsOnly(new String[]{"name"});
    }

    @Test
    default void queueShouldHandleNoSender() throws Exception {
        enQueue(FakeMail.builder().name("name").mimeMessage(Mails.createMimeMessage()).recipients(new MailAddress[]{MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2}).lastUpdated(new Date()).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getMaybeSender()).isEqualTo(MaybeSender.nullSender());
    }

    @Test
    default void queueShouldPreserveMailSender() throws Exception {
        enQueue(Mails.defaultMail().name("mail").sender(MailAddressFixture.SENDER).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getMaybeSender()).isEqualTo(MaybeSender.of(MailAddressFixture.SENDER));
    }

    @Test
    default void queueShouldPreserveMimeMessage() throws Exception {
        MimeMessage createMimeMessage = Mails.createMimeMessage();
        enQueue(Mails.defaultMail().name("mail").mimeMessage(createMimeMessage).build());
        Assertions.assertThat(MimeMessageUtil.asString(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getMessage())).isEqualTo(MimeMessageUtil.asString(createMimeMessage));
    }

    @Test
    default void queueShouldPreserveMailAttribute() throws Exception {
        Attribute convertToAttribute = Attribute.convertToAttribute("any", "value");
        enQueue(Mails.defaultMail().name("mail").attribute(convertToAttribute).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getAttribute(convertToAttribute.getName())).contains(convertToAttribute);
    }

    @Test
    default void queueShouldPreserveErrorMessage() throws Exception {
        enQueue(Mails.defaultMail().name("mail").errorMessage("ErrorMessage").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getErrorMessage()).isEqualTo("ErrorMessage");
    }

    @Test
    default void queueShouldPreserveState() throws Exception {
        enQueue(Mails.defaultMail().name("mail").state("state").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getState()).isEqualTo("state");
    }

    @Test
    default void queueShouldPreserveRemoteAddress() throws Exception {
        enQueue(Mails.defaultMail().name("mail").remoteAddr("remote").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getRemoteAddr()).isEqualTo("remote");
    }

    @Test
    default void queueShouldPreserveRemoteHost() throws Exception {
        enQueue(Mails.defaultMail().name("mail").remoteHost("remote").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getRemoteHost()).isEqualTo("remote");
    }

    @Test
    default void queueShouldPreserveLastUpdated() throws Exception {
        Date date = new Date();
        enQueue(Mails.defaultMail().name("mail").lastUpdated(date).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getLastUpdated()).isEqualTo(date);
    }

    @Test
    default void queueShouldPreserveName() throws Exception {
        enQueue(Mails.defaultMail().name("name").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getName()).isEqualTo("name");
    }

    @Test
    default void queueShouldPreservePerRecipientHeaders() throws Exception {
        PerRecipientHeaders.Header build = PerRecipientHeaders.Header.builder().name("any").value("any").build();
        enQueue(Mails.defaultMail().name("mail").addHeaderForRecipient(build, MailAddressFixture.RECIPIENT1).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getPerRecipientSpecificHeaders().getHeadersForRecipient(MailAddressFixture.RECIPIENT1)).containsOnly(new PerRecipientHeaders.Header[]{build});
    }

    @Test
    default void queueShouldPreserveMultiplePerRecipientHeaders() throws Exception {
        PerRecipientHeaders.Header build = PerRecipientHeaders.Header.builder().name("any").value("any").build();
        PerRecipientHeaders.Header build2 = PerRecipientHeaders.Header.builder().name("any2").value("any").build();
        enQueue(Mails.defaultMail().name("mail").addHeaderForRecipient(build, MailAddressFixture.RECIPIENT1).addHeaderForRecipient(build2, MailAddressFixture.RECIPIENT1).build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).getMail().getPerRecipientSpecificHeaders().getHeadersForRecipient(MailAddressFixture.RECIPIENT1)).containsOnly(new PerRecipientHeaders.Header[]{build, build2});
    }

    @Test
    default void dequeueShouldBeFifo() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        enQueue(Mails.defaultMail().name("name2").build());
        Iterator it = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).toIterable().iterator();
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) it.next();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) it.next();
        mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
        Assertions.assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name2");
    }

    @Test
    default void dequeueCanBeChainedBeforeAck() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        enQueue(Mails.defaultMail().name("name2").build());
        Iterator it = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).toIterable().iterator();
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) it.next();
        MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) it.next();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
        Assertions.assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name2");
    }

    @Test
    default void dequeueCouldBeInterleavingWithOutOfOrderAck() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        enQueue(Mails.defaultMail().name("name2").build());
        Iterator it = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).toIterable().iterator();
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) it.next();
        MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) it.next();
        mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
        Assertions.assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name2");
    }

    @Test
    default void dequeueShouldAllowRetrieveFailItems() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        Iterator it = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).toIterable().iterator();
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) it.next();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
        MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) it.next();
        mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
        Assertions.assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name1");
    }

    @Test
    default void dequeueShouldAllowRetrieveFailItemsNackOutOfOrder() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        Thread.sleep(1L);
        enQueue(Mails.defaultMail().name("name2").build());
        Thread.sleep(1L);
        enQueue(Mails.defaultMail().name("name3").build());
        Iterator it = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).toIterable().iterator();
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) it.next();
        MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) it.next();
        mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
        MailQueue.MailQueueItem mailQueueItem3 = (MailQueue.MailQueueItem) it.next();
        MailQueue.MailQueueItem mailQueueItem4 = (MailQueue.MailQueueItem) it.next();
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
        Assertions.assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name2");
        Assertions.assertThat(Stream.of((Object[]) new MailQueue.MailQueueItem[]{mailQueueItem3, mailQueueItem4}).map(mailQueueItem5 -> {
            return mailQueueItem5.getMail().getName();
        })).containsOnly(new String[]{"name1", "name3"});
    }

    @Test
    default void dequeueShouldNotReturnInProcessingEmails() throws Exception {
        enQueue(Mails.defaultMail().name("name").build());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1);
        Flux subscribeOn = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER);
        Objects.requireNonNull(linkedBlockingQueue);
        subscribeOn.subscribe(Throwing.consumer((v1) -> {
            r1.put(v1);
        }));
        linkedBlockingQueue.take();
        Assertions.assertThat((MailQueue.MailQueueItem) linkedBlockingQueue.poll(2L, TimeUnit.SECONDS)).isNull();
    }

    @Test
    default void deQueueShouldBlockWhenNoMail() {
        Mono next = Flux.from(getMailQueue().deQueue()).subscribeOn(SCHEDULER).next();
        Assertions.assertThatThrownBy(() -> {
            next.block(Duration.ofSeconds(2L));
        }).isInstanceOf(RuntimeException.class);
    }

    @Test
    default void deQueueShouldWaitForAMailToBeEnqueued() throws Exception {
        MailQueue mailQueue = getMailQueue();
        FakeMail build = Mails.defaultMail().name("name").build();
        Mono next = Flux.from(mailQueue.deQueue()).next();
        mailQueue.enQueue(build);
        Assertions.assertThat(((MailQueue.MailQueueItem) next.block(Duration.ofMinutes(1L))).getMail().getName()).isEqualTo("name");
    }

    @Test
    default void concurrentEnqueueDequeueShouldNotFail() throws Exception {
        MailQueue mailQueue = getMailQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1);
        Flux.from(mailQueue.deQueue()).subscribeOn(SCHEDULER).flatMap(mailQueueItem -> {
            try {
                linkedBlockingQueue.put(mailQueueItem);
            } catch (InterruptedException e) {
            }
            return Mono.empty();
        }).subscribe();
        ConcurrentTestRunner.builder().operation((i, i2) -> {
            if (i2 % 2 == 0) {
                mailQueue.enQueue(Mails.defaultMail().name("name" + i + "-" + i2).build());
                return;
            }
            MailQueue.MailQueueItem mailQueueItem2 = (MailQueue.MailQueueItem) linkedBlockingQueue.take();
            concurrentLinkedDeque.add(mailQueueItem2.getMail());
            mailQueueItem2.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        }).threadCount(10).operationCount(10).runSuccessfullyWithin(Duration.ofMinutes(5L));
        Assertions.assertThat(concurrentLinkedDeque.stream().map((v0) -> {
            return v0.getName();
        }).distinct()).hasSize(50);
    }

    @Test
    default void concurrentEnqueueDequeueWithAckNackShouldNotFail() throws Exception {
        MailQueue mailQueue = getMailQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        Flux subscribeOn = Flux.from(mailQueue.deQueue()).subscribeOn(SCHEDULER);
        Objects.requireNonNull(linkedBlockingDeque);
        subscribeOn.doOnNext((v1) -> {
            r1.addFirst(v1);
        }).subscribe();
        ConcurrentTestRunner.builder().operation((i, i2) -> {
            if (i2 % 3 == 0) {
                mailQueue.enQueue(Mails.defaultMail().name("name" + i + "-" + i2).build());
            }
            if (i2 % 3 == 1) {
                ((MailQueue.MailQueueItem) linkedBlockingDeque.takeLast()).done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
            }
            if (i2 % 3 == 2) {
                MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) linkedBlockingDeque.takeLast();
                concurrentLinkedDeque.add(mailQueueItem.getMail());
                mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
            }
        }).threadCount(10).operationCount(15).runSuccessfullyWithin(Duration.ofMinutes(1L));
        Assertions.assertThat(concurrentLinkedDeque.stream().map((v0) -> {
            return v0.getName();
        }).distinct()).hasSize(50);
    }

    @Test
    default void dequeueShouldBeConcurrent() {
        MailQueue mailQueue = getMailQueue();
        int i = 1000;
        Flux.range(0, 1000).flatMap(Throwing.function(num -> {
            return mailQueue.enqueueReactive(Mails.defaultMail().name("name" + num).build());
        }), getMailQueueMaxConcurrency()).blockLast();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Flux.from(mailQueue.deQueue()).flatMap(mailQueueItem -> {
            return Mono.fromRunnable(() -> {
                concurrentLinkedDeque.add(mailQueueItem.getMail());
            }).delayElement(Duration.ofMillis(100L)).then(Mono.fromRunnable(Throwing.runnable(() -> {
                mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
            }))).subscribeOn(SCHEDULER);
        }, 1000).subscribeOn(Schedulers.newSingle("foo")).subscribe();
        Awaitility.await().atMost(Durations.ONE_MINUTE).until(() -> {
            return Boolean.valueOf(concurrentLinkedDeque.size() >= i);
        });
    }
}
