package org.apache.james.task.eventsourcing.distributed;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.json.DTOConverter;
import org.apache.james.json.DTOModule;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.class */
class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
    private static final JsonTaskSerializer TASK_SERIALIZER = JsonTaskSerializer.of(new TaskDTOModule[0]);
    private static final Set<EventDTOModule<? extends Event, ? extends EventDTO>> MODULES = TasksSerializationModule.list(TASK_SERIALIZER, DTOConverter.of(new DTOModule[0]), DTOConverter.of(new DTOModule[0]));
    private static final JsonEventSerializer SERIALIZER = JsonEventSerializer.forModules(MODULES).withoutNestedType();

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);

    RabbitMQTerminationSubscriberTest() {
    }

    public TerminationSubscriber subscriber() {
        RabbitMQTerminationSubscriber rabbitMQTerminationSubscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
        rabbitMQTerminationSubscriber.start();
        return rabbitMQTerminationSubscriber;
    }

    @Test
    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() {
        TerminationSubscriber subscriber = subscriber();
        TerminationSubscriber subscriber2 = subscriber();
        Flux from = Flux.from(subscriber.listenEvents());
        Flux from2 = Flux.from(subscriber2.listenEvents());
        sendEvents(subscriber, new Event[]{COMPLETED_EVENT});
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        from.subscribe((v1) -> {
            r1.add(v1);
        });
        ArrayList arrayList2 = new ArrayList();
        Objects.requireNonNull(arrayList2);
        from2.subscribe((v1) -> {
            r1.add(v1);
        });
        Awaitility.await().atMost(Durations.ONE_MINUTE).until(() -> {
            return Boolean.valueOf(arrayList.size() == 1 && arrayList2.size() == 1);
        });
        Assertions.assertThat(arrayList).containsExactly(new Event[]{COMPLETED_EVENT});
        Assertions.assertThat(arrayList2).containsExactly(new Event[]{COMPLETED_EVENT});
    }

    @Test
    void eventProcessingShouldNotCrashOnInvalidMessage() {
        TerminationSubscriber subscriber = subscriber();
        Flux from = Flux.from(subscriber.listenEvents());
        rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage("terminationSubscriberExchange", "terminationSubscriberRoutingKey", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        sendEvents(subscriber, new Event[]{COMPLETED_EVENT});
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        from.subscribe((v1) -> {
            r1.add(v1);
        });
        Awaitility.await().timeout(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(arrayList).hasSize(1);
        });
    }

    @Test
    void eventProcessingShouldNotCrashOnInvalidMessages() {
        TerminationSubscriber subscriber = subscriber();
        Flux from = Flux.from(subscriber.listenEvents());
        IntStream.range(0, 10).forEach(i -> {
            rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage("terminationSubscriberExchange", "terminationSubscriberRoutingKey", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        });
        sendEvents(subscriber, new Event[]{COMPLETED_EVENT});
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Objects.requireNonNull(concurrentLinkedQueue);
        from.subscribe((v1) -> {
            r1.add(v1);
        });
        Awaitility.await().atMost(Durations.ONE_MINUTE).untilAsserted(() -> {
            SoftAssertions.assertSoftly(softAssertions -> {
                Assertions.assertThat(concurrentLinkedQueue).containsExactly(new Event[]{COMPLETED_EVENT});
            });
        });
    }
}
