/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.deployment;

import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentRedistributor;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.test.util.record.DeploymentDistributionRecordStream;
import io.camunda.zeebe.test.util.record.DeploymentRecordStream;
import io.camunda.zeebe.test.util.record.RecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;

public class MultiPartitionDeploymentLifecycleTest {
    private static final int PARTITION_COUNT = 3;
    private static final String DMN_RESOURCE = "/dmn/decision-table.dmn";
    @Rule
    public final EngineRule engine = EngineRule.multiplePartition(3);
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldTestLifecycle() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)"shouldReDistributeAfterRecovery").startEvent().endEvent().done();
        this.engine.deployment().withXmlResource("process.bpmn", modelInstance).deploy();
        ((AbstractListAssert)((AbstractListAssert)((AbstractListAssert)Assertions.assertThat((Stream)((RecordStream)RecordingExporter.records().withPartitionId(1)).limit(r -> r.getIntent().equals(DeploymentIntent.FULLY_DISTRIBUTED))).extracting(new Function[]{Record::getIntent, Record::getRecordType, r -> r.getValue() instanceof DeploymentDistributionRecordValue ? ((DeploymentDistributionRecordValue)r.getValue()).getPartitionId() : r.getPartitionId()}).startsWith((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{DeploymentIntent.CREATE, RecordType.COMMAND, 1}), Assertions.tuple((Object[])new Object[]{ProcessIntent.CREATED, RecordType.EVENT, 1}), Assertions.tuple((Object[])new Object[]{DeploymentIntent.CREATED, RecordType.EVENT, 1})})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.DISTRIBUTING, RecordType.EVENT, 2}), Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.COMPLETE, RecordType.COMMAND, 2}), Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.COMPLETED, RecordType.EVENT, 2})})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.DISTRIBUTING, RecordType.EVENT, 3}), Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.COMPLETE, RecordType.COMMAND, 3}), Assertions.tuple((Object[])new Object[]{DeploymentDistributionIntent.COMPLETED, RecordType.EVENT, 3})})).endsWith((Object)Assertions.tuple((Object[])new Object[]{DeploymentIntent.FULLY_DISTRIBUTED, RecordType.EVENT, 1}), (Object[])new Tuple[0]);
        Assertions.assertThat((List)((List)((RecordStream)((RecordStream)RecordingExporter.records().withPartitionId(2)).limit(r -> r.getIntent().equals(DeploymentIntent.DISTRIBUTED))).collect(Collectors.toList()))).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{DeploymentIntent.DISTRIBUTE, DeploymentIntent.DISTRIBUTED});
        Assertions.assertThat((List)((List)((RecordStream)((RecordStream)RecordingExporter.records().withPartitionId(3)).limit(r -> r.getIntent().equals(DeploymentIntent.DISTRIBUTED))).collect(Collectors.toList()))).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{DeploymentIntent.DISTRIBUTE, DeploymentIntent.DISTRIBUTED});
    }

    @Test
    public void shouldDistributeDmnResources() {
        this.engine.deployment().withXmlClasspathResource(DMN_RESOURCE).deploy();
        ((AbstractListAssert)Assertions.assertThat((Stream)((DeploymentRecordStream)RecordingExporter.deploymentRecords().withPartitionId(1)).limit(3L)).extracting(Record::getIntent).hasSize(3)).contains((Object[])new Intent[]{DeploymentIntent.FULLY_DISTRIBUTED});
        Assertions.assertThat((Stream)RecordingExporter.deploymentRecords((DeploymentIntent)DeploymentIntent.DISTRIBUTED).limit(2L)).extracting(Record::getPartitionId).contains((Object[])new Integer[]{2, 3});
        DeploymentRecordValue distributedEvent = (DeploymentRecordValue)((Record)RecordingExporter.deploymentRecords((DeploymentIntent)DeploymentIntent.DISTRIBUTED).getFirst()).getValue();
        ((ListAssert)Assertions.assertThat((List)distributedEvent.getDecisionRequirementsMetadata()).describedAs("Expect that decision requirements are distributed", new Object[0])).isNotEmpty();
        ((ListAssert)Assertions.assertThat((List)distributedEvent.getDecisionsMetadata()).describedAs("Expect that decisions are distributed", new Object[0])).isNotEmpty();
    }

    @Test
    public void shouldRejectCompleteDeploymentDistributionWhenAlreadyCompleted() {
        this.engine.pauseProcessing(2);
        this.engine.pauseProcessing(3);
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess((String)"shouldReDistributeAfterRecovery").startEvent().endEvent().done()).expectCreated().deploy();
        ((RecordStream)((RecordStream)((RecordStream)RecordingExporter.records().withPartitionId(2)).withValueType(ValueType.DEPLOYMENT)).withIntent((Intent)DeploymentIntent.DISTRIBUTE)).await();
        this.engine.getClock().addTime(DeploymentRedistributor.DEPLOYMENT_REDISTRIBUTION_INTERVAL);
        Awaitility.await().untilAsserted(() -> {
            this.engine.getClock().addTime(DeploymentRedistributor.DEPLOYMENT_REDISTRIBUTION_INTERVAL);
            Assertions.assertThat((Stream)((RecordStream)((RecordStream)((RecordStream)RecordingExporter.records().withPartitionId(2)).withValueType(ValueType.DEPLOYMENT)).withIntent((Intent)DeploymentIntent.DISTRIBUTE)).limit(2L)).hasSize(2);
        });
        this.engine.resumeProcessing(2);
        Assertions.assertThat((Stream)((DeploymentDistributionRecordStream)RecordingExporter.deploymentDistributionRecords().withIntent((Intent)DeploymentDistributionIntent.COMPLETE)).withPartitionId(2).limit(3L)).extracting(Record::getRecordType).describedAs("Expect second command to be rejected", new Object[0]).containsExactlyInAnyOrder((Object[])new RecordType[]{RecordType.COMMAND, RecordType.COMMAND, RecordType.COMMAND_REJECTION});
    }
}

