package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.bpmn.multiinstance.MultiInstanceSubProcessTest;
import io.camunda.zeebe.engine.processing.message.MessageObserver;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ReplayStateTest.class */
public final class ReplayStateTest {
    private static final String PROCESS_ID = "process";
    private static final String PROCESS_CHILD_ID = "child_process";

    @Parameterized.Parameter
    public TestCase testCase;
    private long lastProcessedPosition = -1;

    @Rule
    public final EngineRule engine = EngineRule.singlePartition().withOnProcessedCallback(typedRecord -> {
        this.lastProcessedPosition = typedRecord.getPosition();
    }).withOnSkippedCallback(loggedEvent -> {
        this.lastProcessedPosition = loggedEvent.getPosition();
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ReplayStateTest$TestCase.class */
    public static final class TestCase {
        private final String description;
        private final List<BpmnModelInstance> processes = new ArrayList();
        private Function<EngineRule, Record<?>> execution = engineRule -> {
            return (Record) RecordingExporter.records().getFirst();
        };

        private TestCase(String str) {
            this.description = str;
        }

        private TestCase withProcess(BpmnModelInstance bpmnModelInstance) {
            this.processes.add(bpmnModelInstance);
            return this;
        }

        private TestCase withExecution(Function<EngineRule, Record<?>> function) {
            this.execution = function;
            return this;
        }

        public String toString() {
            return "TestCase{" + this.description + "}";
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<TestCase> testRecords() {
        return List.of((Object[]) new TestCase[]{testCase("activated service task").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("test");
        }).done()).withExecution(engineRule -> {
            engineRule.processInstance().ofBpmnProcessId("process").create();
            RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.SERVICE_TASK).await();
            return (Record) RecordingExporter.jobRecords(JobIntent.CREATED).getFirst();
        }), testCase("expired buffered message").withExecution(engineRule2 -> {
            Duration ofMinutes = Duration.ofMinutes(1L);
            engineRule2.message().withName("test").withCorrelationKey("1").withTimeToLive(ofMinutes).publish();
            engineRule2.getClock().addTime(ofMinutes.plus(MessageObserver.MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL));
            return (Record) RecordingExporter.messageRecords(MessageIntent.EXPIRED).getFirst();
        }), testCase("timer start event").withProcess(Bpmn.createExecutableProcess("process").startEvent("timer").timerWithDateExpression("now()").endEvent().done()).withExecution(engineRule3 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withBpmnProcessId("process").withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("intermediate timer catch event").withProcess(Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("timer").timerWithDuration("PT0S").endEvent().done()).withExecution(engineRule4 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(engineRule4.processInstance().ofBpmnProcessId("process").create()).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("interrupting timer boundary event").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("type");
        }).boundaryEvent("timer", boundaryEventBuilder -> {
            boundaryEventBuilder.cancelActivity(true);
        }).timerWithDuration("PT0S").endEvent("end").done()).withExecution(engineRule5 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(engineRule5.processInstance().ofBpmnProcessId("process").create()).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("interrupting timer boundary event on call activity").withProcess(Bpmn.createExecutableProcess(PROCESS_CHILD_ID).startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder3 -> {
            serviceTaskBuilder3.zeebeJobType("type");
        }).endEvent().done()).withProcess(Bpmn.createExecutableProcess("process").startEvent().callActivity("call-child", callActivityBuilder -> {
            callActivityBuilder.zeebeProcessId(PROCESS_CHILD_ID);
        }).boundaryEvent("timer", boundaryEventBuilder2 -> {
            boundaryEventBuilder2.cancelActivity(true);
        }).timerWithDuration("PT1M").endEvent("end").done()).withExecution(engineRule6 -> {
            long create = engineRule6.processInstance().ofBpmnProcessId("process").create();
            RecordingExporter.jobRecords(JobIntent.CREATED).await();
            engineRule6.getClock().addTime(Duration.ofMinutes(1L));
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(create).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("non-interrupting timer boundary event").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder4 -> {
            serviceTaskBuilder4.zeebeJobType("type");
        }).boundaryEvent("timer", boundaryEventBuilder3 -> {
            boundaryEventBuilder3.cancelActivity(false);
        }).timerWithCycle("R1/PT0S").endEvent("end").done()).withExecution(engineRule7 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(engineRule7.processInstance().ofBpmnProcessId("process").create()).withElementType(BpmnElementType.END_EVENT).withElementId("end").getFirst();
        }), testCase("throw error end event").withProcess(Bpmn.createExecutableProcess("process").startEvent().subProcess("subProcess").embeddedSubProcess().startEvent().endEvent("errorEndEvent", endEventBuilder -> {
            endEventBuilder.error("error");
        }).subProcessDone().boundaryEvent("errorCatchEvent", boundaryEventBuilder4 -> {
            boundaryEventBuilder4.error("error").cancelActivity(true);
        }).endEvent().moveToActivity("subProcess").intermediateCatchEvent("neverProcessed").message(messageBuilder -> {
            messageBuilder.name("message").zeebeCorrelationKey("=\"key\"");
        }).done()).withExecution(engineRule8 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).withRecordKey(engineRule8.processInstance().ofBpmnProcessId("process").create()).getFirst();
        }), testCase("interrupting message boundary event on receive task").withProcess(Bpmn.createExecutableProcess("process").startEvent().receiveTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, receiveTaskBuilder -> {
            receiveTaskBuilder.message(messageBuilder2 -> {
                messageBuilder2.name(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).zeebeCorrelationKeyExpression("1");
            });
        }).boundaryEvent("event", boundaryEventBuilder5 -> {
            boundaryEventBuilder5.cancelActivity(true).message(messageBuilder2 -> {
                messageBuilder2.name("event").zeebeCorrelationKeyExpression("1");
            });
        }).endEvent("end").done()).withExecution(engineRule9 -> {
            long create = engineRule9.processInstance().ofBpmnProcessId("process").create();
            engineRule9.message().withName("event").withCorrelationKey("1").publish();
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(create).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("non-interrupting timer boundary event on receive task").withProcess(Bpmn.createExecutableProcess("process").startEvent().receiveTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, receiveTaskBuilder2 -> {
            receiveTaskBuilder2.message(messageBuilder2 -> {
                messageBuilder2.name(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).zeebeCorrelationKeyExpression("1");
            });
        }).boundaryEvent("event", boundaryEventBuilder6 -> {
            boundaryEventBuilder6.cancelActivity(false).timerWithDuration("PT0S");
        }).endEvent("end").done()).withExecution(engineRule10 -> {
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(engineRule10.processInstance().ofBpmnProcessId("process").create()).withElementType(BpmnElementType.END_EVENT).withElementId("end").getFirst();
        }), testCase("parallel multi-instance service task").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder5 -> {
            serviceTaskBuilder5.zeebeJobType("type").multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.parallel().zeebeInputElement("item").zeebeInputCollectionExpression("items").zeebeOutputElementExpression("result").zeebeOutputCollection("results");
            });
        }).endEvent().done()).withExecution(engineRule11 -> {
            long create = engineRule11.processInstance().ofBpmnProcessId("process").withVariable("items", List.of(1, 2, 3)).create();
            Awaitility.await("until there are 3 jobs ready to be activated").pollInSameThread().until(() -> {
                return Boolean.valueOf(RecordingExporter.jobRecords(JobIntent.CREATED).limit(3L).count() >= 3);
            });
            engineRule11.jobs().withMaxJobsToActivate(3).withType("type").activate().getValue().getJobKeys().forEach(l -> {
                engineRule11.job().withKey(l.longValue()).withVariable("result", 0).complete();
            });
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(create).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("sequential multi-instance service task").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder6 -> {
            serviceTaskBuilder6.zeebeJobType("type").multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.sequential().zeebeInputElement("item").zeebeInputCollectionExpression("items").zeebeOutputElementExpression("result").zeebeOutputCollection("results");
            });
        }).endEvent().done()).withExecution(engineRule12 -> {
            long create = engineRule12.processInstance().ofBpmnProcessId("process").withVariable("items", List.of(1, 2, 3)).create();
            for (int i = 0; i < 3; i++) {
                int i2 = i + 1;
                Awaitility.await("until there are " + i2 + " jobs ready to be activated").pollInSameThread().until(() -> {
                    return Boolean.valueOf(RecordingExporter.jobRecords(JobIntent.CREATED).limit((long) i2).count() >= ((long) i2));
                });
                engineRule12.jobs().withMaxJobsToActivate(1).withType("type").activate().getValue().getJobKeys().forEach(l -> {
                    engineRule12.job().withKey(l.longValue()).withVariable("result", 0).complete();
                });
            }
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(create).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("interrupting parallel multi-instance service task").withProcess(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder7 -> {
            serviceTaskBuilder7.zeebeJobType("type").multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.parallel().zeebeInputElement("item").zeebeInputCollectionExpression("items").zeebeOutputElementExpression("result").zeebeOutputCollection("results");
            }).boundaryEvent("event", boundaryEventBuilder7 -> {
                boundaryEventBuilder7.cancelActivity(true).message(messageBuilder2 -> {
                    messageBuilder2.name("message").zeebeCorrelationKey("=\"key\"");
                });
            }).endEvent();
        }).endEvent().done()).withExecution(engineRule13 -> {
            long create = engineRule13.processInstance().ofBpmnProcessId("process").withVariable("items", List.of(1, 2, 3)).create();
            Awaitility.await("until there are 3 jobs ready to be activated").pollInSameThread().until(() -> {
                return Boolean.valueOf(RecordingExporter.jobRecords(JobIntent.CREATED).limit(3L).count() >= 2);
            });
            engineRule13.jobs().withMaxJobsToActivate(2).withType("type").activate().getValue().getJobKeys().forEach(l -> {
                engineRule13.job().withKey(l.longValue()).withVariable("result", 0).complete();
            });
            engineRule13.message().withName("message").withCorrelationKey("key").publish();
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(create).withElementType(BpmnElementType.PROCESS).getFirst();
        }), testCase("correlate buffered message to start event").withProcess(Bpmn.createExecutableProcess("process").startEvent().message("start").serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder8 -> {
            serviceTaskBuilder8.zeebeJobType(MultiInstanceSubProcessTest.TASK_ELEMENT_ID);
        }).endEvent().done()).withExecution(engineRule14 -> {
            PublishMessageClient withTimeToLive = engineRule14.message().withName("start").withCorrelationKey("go").withTimeToLive(Duration.ofMinutes(5L));
            withTimeToLive.withVariables(Map.of("x", 1)).publish();
            withTimeToLive.withVariables(Map.of("x", 2)).publish();
            engineRule14.job().withKey(((Record) RecordingExporter.jobRecords(JobIntent.CREATED).getFirst()).getKey()).complete();
            engineRule14.job().withKey(((Record) RecordingExporter.jobRecords(JobIntent.CREATED).skip(1L).getFirst()).getKey()).complete();
            return (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).skip(1L).getFirst();
        })});
    }

    @Before
    public void init() {
        this.lastProcessedPosition = -1L;
    }

    @Test
    public void shouldRestoreState() {
        this.testCase.processes.forEach(bpmnModelInstance -> {
            this.engine.deployment().withXmlResource(bpmnModelInstance).deploy();
        });
        Record<?> apply = this.testCase.execution.apply(this.engine);
        Awaitility.await("await until the last record is processed").untilAsserted(() -> {
            Assertions.assertThat(this.lastProcessedPosition).isGreaterThanOrEqualTo(apply.getPosition());
        });
        Map<ZbColumnFamilies, Map<Object, Object>> collectState = this.engine.collectState();
        this.engine.stop();
        this.engine.start();
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat((StreamProcessor.Phase) this.engine.getStreamProcessor(1).getCurrentPhase().join()).isEqualTo(StreamProcessor.Phase.PROCESSING);
        });
        Awaitility.await("await that the replay state is equal to the processing state").untilAsserted(() -> {
            Map<ZbColumnFamilies, Map<Object, Object>> collectState2 = this.engine.collectState();
            SoftAssertions softAssertions = new SoftAssertions();
            collectState.entrySet().stream().filter(entry -> {
                return entry.getKey() != ZbColumnFamilies.DEFAULT;
            }).forEach(entry2 -> {
                ZbColumnFamilies zbColumnFamilies = (ZbColumnFamilies) entry2.getKey();
                Map map = (Map) entry2.getValue();
                Map map2 = (Map) collectState2.get(zbColumnFamilies);
                if (map.isEmpty()) {
                    softAssertions.assertThat(map2).describedAs("The state column '%s' should be empty after replay", new Object[]{zbColumnFamilies}).isEmpty();
                } else {
                    softAssertions.assertThat(map2).describedAs("The state column '%s' has different entries after replay", new Object[]{zbColumnFamilies}).containsExactlyInAnyOrderEntriesOf(map);
                }
            });
            softAssertions.assertAll();
        });
    }

    private static TestCase testCase(String str) {
        return new TestCase(str);
    }
}
