package fr.maif.eventsourcing.datastore;

import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventProcessorImpl;
import fr.maif.eventsourcing.ProcessingSuccess;
import fr.maif.eventsourcing.datastore.TestCommand;
import io.vavr.Tuple0;
import io.vavr.control.Either;
import io.vavr.control.Option;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

/* loaded from: input_file:fr/maif/eventsourcing/datastore/DataStoreVerification.class */
public abstract class DataStoreVerification<TxCtx> implements DataStoreVerificationRules<TestState, TestEvent, Tuple0, Tuple0, TxCtx> {
    public abstract EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor(String str);

    public abstract String kafkaBootstrapUrl();

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_submitValidSingleEventCommandMustWriteEventInDataStore() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitValidCommand(eventProcessor, "1");
        List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readFromDataStore(eventProcessor.eventStore()));
        cleanup(eventProcessor);
        Assertions.assertThat(deduplicateOnId).hasSize(1);
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitInvalidCommand(eventProcessor, "1");
        List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readFromDataStore(eventProcessor.eventStore()));
        cleanup(eventProcessor);
        Assertions.assertThat(deduplicateOnId).isEmpty();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitMultiEventsCommand(eventProcessor, "1");
        List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readFromDataStore(eventProcessor.eventStore()));
        cleanup(eventProcessor);
        Assertions.assertThat(deduplicateOnId.size()).isGreaterThan(1);
        Assertions.assertThat((List) deduplicateOnId.stream().map(eventEnvelope -> {
            return eventEnvelope.id;
        }).collect(Collectors.toList())).doesNotHaveDuplicates();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_aggregateOfSingleEventStateShouldBeCorrect() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitValidCommand(eventProcessor, "1");
        Option<TestState> readState = readState(eventProcessor, "1");
        cleanup(eventProcessor);
        Assertions.assertThat(readState.isDefined()).isTrue();
        Assertions.assertThat(((TestState) readState.get()).count).isEqualTo(1);
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_aggregateOfDeleteEventStateShouldBeEmpty() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitValidCommand(eventProcessor, "1");
        submitDeleteCommand(eventProcessor, "1");
        Option<TestState> readState = readState(eventProcessor, "1");
        cleanup(eventProcessor);
        Assertions.assertThat(readState.isEmpty()).isTrue();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_aggregateOfMultipleEventStateShouldBeCorrect() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        submitMultiEventsCommand(eventProcessor, "1");
        Option<TestState> readState = readState(eventProcessor, "1");
        cleanup(eventProcessor);
        Assertions.assertThat(readState.isDefined()).isTrue();
        Assertions.assertThat(((TestState) readState.get()).count).isEqualTo(2);
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_singleEventShouldBePublished() {
        String randomKafkaTopic = randomKafkaTopic();
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic);
        submitValidCommand(eventProcessor, "1");
        List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), randomKafkaTopic));
        cleanup(eventProcessor);
        Assertions.assertThat(deduplicateOnId).hasSize(1);
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_multipleEventsShouldBePublished() {
        String randomKafkaTopic = randomKafkaTopic();
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic);
        submitMultiEventsCommand(eventProcessor, "1");
        List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), randomKafkaTopic));
        cleanup(eventProcessor);
        Assertions.assertThat(deduplicateOnId).hasSize(2);
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() {
        String randomKafkaTopic = randomKafkaTopic();
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic);
        shutdownBroker();
        submitValidCommand(eventProcessor, "1");
        restartBroker();
        try {
            Thread.sleep(10000L);
            List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), randomKafkaTopic));
            cleanup(eventProcessor);
            Assertions.assertThat(deduplicateOnId).hasSize(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    @Test
    public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
        EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(randomKafkaTopic());
        shutdownDatabase();
        try {
            Either<String, ProcessingSuccess<TestState, TestEvent, Tuple0, Tuple0, Tuple0>> submitValidCommand = submitValidCommand(eventProcessor, "1");
            cleanup(eventProcessor);
            Assertions.assertThat(submitValidCommand.isLeft()).isTrue();
            restartDatabase();
        } catch (Throwable th) {
            restartDatabase();
            throw th;
        }
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    public Either<String, ProcessingSuccess<TestState, TestEvent, Tuple0, Tuple0, Tuple0>> submitValidCommand(EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessorImpl, String str) {
        return (Either) eventProcessorImpl.processCommand(new TestCommand.SimpleCommand(str)).toCompletableFuture().join();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    public void submitInvalidCommand(EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessorImpl, String str) {
        eventProcessorImpl.processCommand(new TestCommand.InvalidCommand(str)).toCompletableFuture().join();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    public void submitMultiEventsCommand(EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessorImpl, String str) {
        eventProcessorImpl.processCommand(new TestCommand.MultiEventCommand(str)).toCompletableFuture().join();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    public void submitDeleteCommand(EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessorImpl, String str) {
        eventProcessorImpl.processCommand(new TestCommand.DeleteCommand(str)).toCompletableFuture().join();
    }

    @Override // fr.maif.eventsourcing.datastore.DataStoreVerificationRules
    public Option<TestState> readState(EventProcessorImpl<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessorImpl, String str) {
        return (Option) eventProcessorImpl.getAggregate(str).toCompletableFuture().join();
    }

    public String randomKafkaTopic() {
        return "test-topic" + UUID.randomUUID();
    }

    private List<EventEnvelope<TestEvent, Tuple0, Tuple0>> deduplicateOnId(List<EventEnvelope<TestEvent, Tuple0, Tuple0>> list) {
        return io.vavr.collection.List.ofAll(list).distinctBy(eventEnvelope -> {
            return eventEnvelope.id;
        }).toJavaList();
    }
}
