package org.apache.james.task.eventsourcing.distributed;

import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskWithId;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.class */
class RabbitMQWorkQueuePersistenceTest {
    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.defaultRabbitMQ().restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_CLASS).isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private RabbitMQWorkQueue testee;
    private ImmediateWorker worker;
    private JsonTaskSerializer serializer;

    RabbitMQWorkQueuePersistenceTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.worker = (ImmediateWorker) Mockito.spy(new ImmediateWorker());
        this.serializer = JsonTaskSerializer.of(new TaskDTOModule[]{TestTaskDTOModules.COMPLETED_TASK_MODULE, (TaskDTOModule) TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore())});
        this.testee = new RabbitMQWorkQueue(this.worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
        this.testee.declareQueue();
    }

    @AfterEach
    void tearDown() {
        this.testee.close();
    }

    @Test
    void submittedMessageShouldSurviveRabbitMQRestart() throws Exception {
        TaskWithId taskWithId = new TaskWithId(TASK_ID, new MemoryReferenceTask(() -> {
            return Task.Result.COMPLETED;
        }));
        this.testee.submit(taskWithId);
        Thread.sleep(500L);
        this.testee.close();
        rabbitMQExtension.getRabbitMQ().restart();
        startNewConsumingWorkqueue();
        Awaitility.await().atMost(Durations.ONE_MINUTE).until(() -> {
            return Boolean.valueOf(!this.worker.results.isEmpty());
        });
        Assertions.assertThat(this.worker.tasks).containsExactly(new TaskWithId[]{taskWithId});
        Assertions.assertThat(this.worker.results).containsExactly(new Task.Result[]{Task.Result.COMPLETED});
    }

    private void startNewConsumingWorkqueue() throws Exception {
        this.worker = (ImmediateWorker) Mockito.spy(new ImmediateWorker());
        this.testee = new RabbitMQWorkQueue(this.worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
        this.testee.start();
    }
}
