/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.randomized;

import io.camunda.zeebe.engine.processing.randomized.FailedPropertyBasedTestDataPrinter;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.ProcessExecutor;
import io.camunda.zeebe.engine.util.client.DeploymentClient;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.test.util.bpmn.random.ExecutionPath;
import io.camunda.zeebe.test.util.bpmn.random.ScheduledExecutionStep;
import io.camunda.zeebe.test.util.bpmn.random.TestDataGenerator;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.Collection;
import java.util.Map;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReplayStateRandomizedPropertyTest {
    private static final String PROCESS_COUNT = System.getProperty("processCount", "3");
    private static final String EXECUTION_PATH_COUNT = System.getProperty("replayExecutionCount", "1");
    private static final long GRACE_PERIOD = 50L;
    @Parameterized.Parameter
    public TestDataGenerator.TestDataRecord record;
    @Rule
    public TestWatcher failedTestDataPrinter = new FailedPropertyBasedTestDataPrinter(this::getDataRecord);
    @Rule
    public final EngineRule engineRule = EngineRule.singlePartition();
    private long lastProcessedPosition = -1L;
    private final ProcessExecutor processExecutor = new ProcessExecutor(this.engineRule);

    @Before
    public void init() {
        this.lastProcessedPosition = -1L;
    }

    public TestDataGenerator.TestDataRecord getDataRecord() {
        return this.record;
    }

    @Test
    public void shouldRestoreStateAtEachStepInExecution() {
        DeploymentClient deployment = this.engineRule.deployment();
        this.record.getBpmnModels().forEach(deployment::withXmlResource);
        deployment.deploy();
        ExecutionPath path = this.record.getExecutionPath();
        for (ScheduledExecutionStep scheduledStep : path.getSteps()) {
            this.record.setCurrentStep(scheduledStep);
            this.processExecutor.applyStep(scheduledStep.getStep());
            this.stopAndRestartEngineAndCompareStates();
        }
        Record result = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).withBpmnProcessId(path.getProcessId()).getFirst();
        long position = result.getPosition();
        Awaitility.await((String)"await the last process record to be processed").untilAsserted(() -> Assertions.assertThat((long)this.engineRule.getLastProcessedPosition()).isGreaterThanOrEqualTo(position));
        this.stopAndRestartEngineAndCompareStates();
    }

    private void stopAndRestartEngineAndCompareStates() {
        Awaitility.await((String)"await the last written record to be processed, then wait a GRACE_PERIOD to make sure no new events are added").untilAsserted(() -> this.processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod());
        this.engineRule.pauseProcessing(1);
        Map<ZbColumnFamilies, Map<Object, Object>> processingState = this.engineRule.collectState();
        this.engineRule.stop();
        this.engineRule.start();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Comparable)((StreamProcessor.Phase)this.engineRule.getStreamProcessor(1).getCurrentPhase().join())).isEqualTo((Object)StreamProcessor.Phase.PROCESSING));
        Awaitility.await((String)"await that the replay state is equal to the processing state").untilAsserted(() -> {
            Map<ZbColumnFamilies, Map<Object, Object>> replayState = this.engineRule.collectState();
            this.assertIdenticalStates(processingState, replayState);
        });
    }

    private void assertIdenticalStates(Map<ZbColumnFamilies, Map<Object, Object>> expectedState, Map<ZbColumnFamilies, Map<Object, Object>> actualState) {
        SoftAssertions softly = new SoftAssertions();
        expectedState.entrySet().stream().filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT).forEach(entry -> {
            ZbColumnFamilies column = (ZbColumnFamilies)entry.getKey();
            Map expectedEntries = (Map)entry.getValue();
            Map actualEntries = (Map)actualState.get(column);
            if (expectedEntries.isEmpty()) {
                ((MapAssert)softly.assertThat(actualEntries).describedAs("The state column '%s' should be empty", new Object[]{column})).isEmpty();
            } else {
                ((MapAssert)softly.assertThat(actualEntries).describedAs("The state column '%s' has different entries", new Object[]{column})).containsExactlyInAnyOrderEntriesOf(expectedEntries);
            }
        });
        softly.assertAll();
    }

    private void processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod() throws InterruptedException {
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.engineRule.hasReachedEnd()).describedAs("Processing has reached end of the log.", new Object[0])).isTrue();
        Map<ZbColumnFamilies, Map<Object, Object>> stateBeforeGracePeriod = this.engineRule.collectState();
        Thread.sleep(50L);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.engineRule.hasReachedEnd()).describedAs("Processing has reached end of the log.", new Object[0])).isTrue();
        Map<ZbColumnFamilies, Map<Object, Object>> stateAfterGracePeriod = this.engineRule.collectState();
        this.assertIdenticalStates(stateBeforeGracePeriod, stateAfterGracePeriod);
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<TestDataGenerator.TestDataRecord> getTestRecords() {
        return TestDataGenerator.generateTestRecords((int)Integer.parseInt(PROCESS_COUNT), (int)Integer.parseInt(EXECUTION_PATH_COUNT));
    }
}

