package org.apache.james.task.eventsourcing.distributed;

import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.json.DTOConverter;
import org.apache.james.json.DTOModule;
import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
import org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO;
import org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskStore;
import org.apache.james.server.task.json.dto.TaskDTO;
import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
import org.apache.james.task.CountDownLatchExtension;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManager;
import org.apache.james.task.TaskManagerContract;
import org.apache.james.task.WorkQueue;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
import org.apache.james.task.eventsourcing.WorkQueueSupplier;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.class */
class DistributedTaskManagerTest implements TaskManagerContract {
    public static final AdditionalInformationDTOModule<?, ?> ADDITIONAL_INFORMATION_MODULE = MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
    static final JsonTaskAdditionalInformationSerializer JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = JsonTaskAdditionalInformationSerializer.of(new AdditionalInformationDTOModule[]{ADDITIONAL_INFORMATION_MODULE});
    static final DTOConverter<TaskExecutionDetails.AdditionalInformation, AdditionalInformationDTO> TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER = DTOConverter.of(new DTOModule[]{ADDITIONAL_INFORMATION_MODULE});
    static final Hostname HOSTNAME = new Hostname("foo");
    static final Hostname HOSTNAME_2 = new Hostname("bar");

    @RegisterExtension
    static final RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();

    @RegisterExtension
    static final CassandraClusterExtension CASSANDRA_CLUSTER = new CassandraClusterExtension(CassandraModule.aggregateModules(new CassandraModule[]{CassandraSchemaVersionModule.MODULE, CassandraEventStoreModule.MODULE(), CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE()}));
    MemoryReferenceTaskStore memoryReferenceTaskStore = new MemoryReferenceTaskStore();
    MemoryReferenceWithCounterTaskStore memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
    ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules = ImmutableSet.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.FAILED_TASK_MODULE, TestTaskDTOModules.THROWING_TASK_MODULE, (TaskDTOModule) TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(this.memoryReferenceTaskStore), (TaskDTOModule) TestTaskDTOModules.MEMORY_REFERENCE_WITH_COUNTER_TASK_MODULE.apply(this.memoryReferenceWithCounterTaskStore));
    JsonTaskSerializer taskSerializer = new JsonTaskSerializer(this.taskDTOModules);
    DTOConverter<Task, TaskDTO> taskDTOConverter = new DTOConverter<>(this.taskDTOModules);
    Set<EventDTOModule<? extends Event, ? extends EventDTO>> eventDtoModule = TasksSerializationModule.list(this.taskSerializer, TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER, this.taskDTOConverter);

    @RegisterExtension
    CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(CASSANDRA_CLUSTER, JsonEventSerializer.forModules(this.eventDtoModule).withNestedTypeModules(Sets.union(ImmutableSet.of(ADDITIONAL_INFORMATION_MODULE), this.taskDTOModules)));

    @RegisterExtension
    CountDownLatchExtension countDownLatchExtension = new CountDownLatchExtension();
    TrackedRabbitMQWorkQueueSupplier workQueueSupplier;
    EventStore eventStore;
    List<RabbitMQTerminationSubscriber> terminationSubscribers;
    TaskExecutionDetailsProjection executionDetailsProjection;
    JsonEventSerializer eventSerializer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest$TrackedRabbitMQWorkQueueSupplier.class */
    public static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier {
        private final List<RabbitMQWorkQueue> workQueues = new ArrayList();
        private final RabbitMQWorkQueueSupplier supplier;

        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer jsonTaskSerializer) {
            this.supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, jsonTaskSerializer);
        }

        public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
            RabbitMQWorkQueue apply = this.supplier.apply(eventSourcingSystem, TaskManagerContract.UPDATE_INFORMATION_POLLING_INTERVAL);
            apply.start();
            this.workQueues.add(apply);
            return apply;
        }

        void stopWorkQueues() {
            this.workQueues.forEach((v0) -> {
                v0.close();
            });
            this.workQueues.clear();
        }
    }

    DistributedTaskManagerTest() {
    }

    @BeforeEach
    void setUp(EventStore eventStore) {
        CassandraCluster cassandraCluster = CASSANDRA_CLUSTER.getCassandraCluster();
        this.executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(new CassandraTaskExecutionDetailsProjectionDAO(cassandraCluster.getConf(), cassandraCluster.getTypesProvider(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER));
        this.workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.taskSerializer);
        this.eventStore = eventStore;
        this.terminationSubscribers = new ArrayList();
        this.eventSerializer = JsonEventSerializer.forModules(this.eventDtoModule).withoutNestedType();
    }

    @AfterEach
    void tearDown() {
        this.terminationSubscribers.forEach((v0) -> {
            v0.close();
        });
        this.workQueueSupplier.stopWorkQueues();
    }

    /* renamed from: taskManager, reason: merged with bridge method [inline-methods] */
    public EventSourcingTaskManager m3taskManager() {
        return taskManager(HOSTNAME);
    }

    EventSourcingTaskManager taskManager(Hostname hostname) {
        RabbitMQTerminationSubscriber rabbitMQTerminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.eventSerializer);
        this.terminationSubscribers.add(rabbitMQTerminationSubscriber);
        rabbitMQTerminationSubscriber.start();
        return new EventSourcingTaskManager(this.workQueueSupplier, this.eventStore, this.executionDetailsProjection, hostname, rabbitMQTerminationSubscriber);
    }

    @Test
    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
        EventSourcingTaskManager m3taskManager = m3taskManager();
        try {
            EventSourcingTaskManager taskManager = taskManager(HOSTNAME_2);
            try {
                TaskId submit = m3taskManager.submit(new CompletedTask());
                Awaitility.await().atMost(Duration.FIVE_SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
                    return Boolean.valueOf(m3taskManager.await(submit, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED);
                });
                TaskExecutionDetails executionDetails = m3taskManager.getExecutionDetails(submit);
                Assertions.assertThat(executionDetails).isEqualTo(taskManager.getExecutionDetails(submit));
                if (taskManager != null) {
                    taskManager.close();
                }
                if (m3taskManager != null) {
                    m3taskManager.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (m3taskManager != null) {
                try {
                    m3taskManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        EventSourcingTaskManager m3taskManager = m3taskManager();
        try {
            EventSourcingTaskManager taskManager = taskManager(HOSTNAME_2);
            try {
                m3taskManager.submit(new MemoryReferenceTask(() -> {
                    countDownLatch.await();
                    return Task.Result.COMPLETED;
                }));
                TaskId submit = m3taskManager.submit(new CompletedTask());
                awaitUntilTaskHasStatus(submit, TaskManager.Status.WAITING, taskManager);
                countDownLatch.countDown();
                Awaitility.await().atMost(Duration.ONE_SECOND).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
                    return Boolean.valueOf(m3taskManager.await(submit, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED);
                });
                if (taskManager != null) {
                    taskManager.close();
                }
                if (m3taskManager != null) {
                    m3taskManager.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (m3taskManager != null) {
                try {
                    m3taskManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException {
        EventSourcingTaskManager m3taskManager = m3taskManager();
        try {
            Thread.sleep(100L);
            EventSourcingTaskManager taskManager = taskManager(HOSTNAME_2);
            try {
                TaskId submit = taskManager.submit(new CompletedTask());
                Awaitility.await().atMost(Duration.ONE_SECOND).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
                    return Boolean.valueOf(m3taskManager.await(submit, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED);
                });
                TaskExecutionDetails executionDetails = taskManager.getExecutionDetails(submit);
                Assertions.assertThat(executionDetails.getSubmittedNode()).isEqualTo(HOSTNAME_2);
                Assertions.assertThat(executionDetails.getRanNode()).contains(HOSTNAME);
                if (taskManager != null) {
                    taskManager.close();
                }
                if (m3taskManager != null) {
                    m3taskManager.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (m3taskManager != null) {
                try {
                    m3taskManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) {
        EventSourcingTaskManager taskManager = taskManager(HOSTNAME);
        EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2);
        TaskId submit = taskManager.submit(new MemoryReferenceTask(() -> {
            countDownLatch.await();
            return Task.Result.COMPLETED;
        }));
        awaitUntilTaskHasStatus(submit, TaskManager.Status.IN_PROGRESS, taskManager);
        Pair<Hostname, TaskManager> otherTaskManager = getOtherTaskManager((Hostname) taskManager.getExecutionDetails(submit).getRanNode().get(), Pair.of(HOSTNAME, taskManager), Pair.of(HOSTNAME_2, taskManager2));
        ((TaskManager) otherTaskManager.getValue()).cancel(submit);
        awaitAtMostFiveSeconds.untilAsserted(() -> {
            Assertions.assertThat(taskManager.getExecutionDetails(submit).getStatus()).isIn(new Object[]{TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED});
        });
        countDownLatch.countDown();
        awaitUntilTaskHasStatus(submit, TaskManager.Status.CANCELLED, taskManager);
        Assertions.assertThat(taskManager.getExecutionDetails(submit).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
        Assertions.assertThat(taskManager.getExecutionDetails(submit).getCancelRequestedNode()).contains((Hostname) otherTaskManager.getKey());
    }

    @Test
    void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException {
        EventSourcingTaskManager taskManager = taskManager(HOSTNAME);
        EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2);
        TaskId submit = taskManager.submit(new MemoryReferenceTask(() -> {
            Thread.sleep(250L);
            return Task.Result.COMPLETED;
        }));
        awaitUntilTaskHasStatus(submit, TaskManager.Status.IN_PROGRESS, taskManager);
        ((TaskManager) getOtherTaskManager((Hostname) taskManager.getExecutionDetails(submit).getRanNode().get(), Pair.of(HOSTNAME, taskManager), Pair.of(HOSTNAME_2, taskManager2)).getValue()).await(submit, TIMEOUT);
        Assertions.assertThat(taskManager.getExecutionDetails(submit).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
    }

    private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname hostname, Pair<Hostname, TaskManager> pair, Pair<Hostname, TaskManager> pair2) {
        return hostname.equals(pair.getKey()) ? pair2 : pair;
    }

    @Test
    void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() {
        EventSourcingTaskManager m3taskManager = m3taskManager();
        try {
            EventSourcingTaskManager taskManager = taskManager(HOSTNAME_2);
            try {
                TaskId submit = m3taskManager.submit(new CompletedTask());
                TaskId submit2 = taskManager.submit(new CompletedTask());
                Awaitility.await().atMost(Duration.ONE_SECOND).pollInterval(100L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                    Assertions.assertThat(m3taskManager.list()).hasSize(2).isEqualTo(taskManager.list()).allSatisfy(taskExecutionDetails -> {
                        Assertions.assertThat(taskExecutionDetails.getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
                    }).extracting((v0) -> {
                        return v0.getTaskId();
                    }).containsExactlyInAnyOrder(new TaskId[]{submit, submit2});
                });
                if (taskManager != null) {
                    taskManager.close();
                }
                if (m3taskManager != null) {
                    m3taskManager.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (m3taskManager != null) {
                try {
                    m3taskManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
        EventSourcingTaskManager m3taskManager = m3taskManager();
        try {
            EventSourcingTaskManager taskManager = taskManager(HOSTNAME_2);
            try {
                ImmutableBiMap<EventSourcingTaskManager, Hostname> of = ImmutableBiMap.of(m3taskManager, HOSTNAME, taskManager, HOSTNAME_2);
                TaskId submit = m3taskManager.submit(new MemoryReferenceTask(() -> {
                    countDownLatch.await();
                    return Task.Result.COMPLETED;
                }));
                awaitUntilTaskHasStatus(submit, TaskManager.Status.IN_PROGRESS, m3taskManager);
                Hostname hostname = (Hostname) m3taskManager.getExecutionDetails(submit).getRanNode().get();
                Hostname otherNode = getOtherNode(of, hostname);
                EventSourcingTaskManager eventSourcingTaskManager = (EventSourcingTaskManager) of.inverse().get(hostname);
                EventSourcingTaskManager eventSourcingTaskManager2 = (EventSourcingTaskManager) of.inverse().get(otherNode);
                TaskId submit2 = eventSourcingTaskManager.submit(new CompletedTask());
                eventSourcingTaskManager.close();
                awaitAtMostFiveSeconds.untilAsserted(() -> {
                    Assertions.assertThat(eventSourcingTaskManager2.getExecutionDetails(submit2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
                });
                Assertions.assertThat(eventSourcingTaskManager2.getExecutionDetails(submit2).getRanNode()).contains(otherNode);
                if (taskManager != null) {
                    taskManager.close();
                }
                if (m3taskManager != null) {
                    m3taskManager.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (m3taskManager != null) {
                try {
                    m3taskManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, Hostname> immutableBiMap, Hostname hostname) {
        return (Hostname) immutableBiMap.values().stream().filter(hostname2 -> {
            return !hostname2.equals(hostname);
        }).findFirst().get();
    }
}
