package org.apache.james.rrt.cassandra.migration;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.core.Domain;
import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
import org.apache.james.rrt.cassandra.CassandraRRTModule;
import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
import org.apache.james.rrt.lib.Mapping;
import org.apache.james.rrt.lib.MappingSource;
import org.apache.james.task.Task;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.class */
class MappingsSourcesMigrationTest {
    private static final int THREAD_COUNT = 10;
    private static final int OPERATION_COUNT = 10;
    private static final int MAPPING_COUNT = 100;
    private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
    private CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
    private MappingsSourcesMigration migration;
    private static final String USER = "test";
    private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
    private static final String ADDRESS = "test@domain";
    private static final Mapping MAPPING = Mapping.alias(ADDRESS);

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);

    MappingsSourcesMigrationTest() {
    }

    @BeforeEach
    void setUp(CassandraCluster cassandraCluster2) {
        this.cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandraCluster2.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
        this.cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandraCluster2.getConf());
        this.migration = new MappingsSourcesMigration(this.cassandraRecipientRewriteTableDAO, this.cassandraMappingsSourcesDAO);
    }

    @Test
    void emptyMigrationShouldSucceed() {
        Assertions.assertThat(this.migration.run()).isEqualTo(Task.Result.COMPLETED);
    }

    @Test
    void migrationShouldSucceedWithData() {
        this.cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
        Assertions.assertThat(this.migration.run()).isEqualTo(Task.Result.COMPLETED);
    }

    @Test
    void migrationShouldCreateMappingSourceFromMapping() {
        this.cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
        this.migration.run();
        Assertions.assertThat((List) this.cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()).containsExactly(new MappingSource[]{SOURCE});
    }

    @Test
    void migrationShouldCreateMultipleMappingSourcesFromMappings() {
        MappingSource fromUser = MappingSource.fromUser("bob", Domain.LOCALHOST);
        this.cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
        this.cassandraRecipientRewriteTableDAO.addMapping(fromUser, MAPPING).block();
        this.migration.run();
        Assertions.assertThat((List) this.cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()).containsOnly(new MappingSource[]{SOURCE, fromUser});
    }

    @Test
    void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() {
        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = (CassandraRecipientRewriteTableDAO) Mockito.mock(CassandraRecipientRewriteTableDAO.class);
        this.migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, (CassandraMappingsSourcesDAO) Mockito.mock(CassandraMappingsSourcesDAO.class));
        Mockito.when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException()));
        Assertions.assertThat(this.migration.run()).isEqualTo(Task.Result.PARTIAL);
    }

    @Test
    void migrationShouldReturnPartialAddMappingFails() {
        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = (CassandraRecipientRewriteTableDAO) Mockito.mock(CassandraRecipientRewriteTableDAO.class);
        CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = (CassandraMappingsSourcesDAO) Mockito.mock(CassandraMappingsSourcesDAO.class);
        this.migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
        Mockito.when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.just(Pair.of(SOURCE, MAPPING)));
        Mockito.when(cassandraMappingsSourcesDAO.addMapping((Mapping) ArgumentMatchers.any(Mapping.class), (MappingSource) ArgumentMatchers.any(MappingSource.class))).thenThrow(new Throwable[]{new RuntimeException()});
        Assertions.assertThat(this.migration.run()).isEqualTo(Task.Result.PARTIAL);
    }

    @Test
    void migrationShouldBeIdempotentWhenRunMultipleTimes() throws ExecutionException, InterruptedException {
        IntStream.range(0, MAPPING_COUNT).forEach(i -> {
        });
        ConcurrentTestRunner.builder().operation((i2, i3) -> {
            this.migration.run();
        }).threadCount(10).operationCount(10).runSuccessfullyWithin(Duration.ofMinutes(1L));
        Assertions.assertThat((List) this.cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()).hasSize(MAPPING_COUNT);
    }
}
