package io.camunda.zeebe.engine.state.migration.to_1_1;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.ProcessingStateExtension;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
import java.util.ArrayList;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ProcessingStateExtension.class})
/* loaded from: input_file:io/camunda/zeebe/engine/state/migration/to_1_1/DbMigrationStateTest.class */
public class DbMigrationStateTest {
    private static final long TEST_SENT_TIME = 1000;
    private ZeebeDb<ZbColumnFamilies> zeebeDb;
    private MutableProcessingState processingState;
    private TransactionContext transactionContext;

    @Test
    public void testMigrateMessageSubscriptionSentTime() throws Exception {
        LegacyDbMessageSubscriptionState legacyDbMessageSubscriptionState = new LegacyDbMessageSubscriptionState(this.zeebeDb, this.transactionContext);
        LegacyMessageSubscription createLegacyMessageSubscription = TestUtilities.createLegacyMessageSubscription(100L, 1L);
        legacyDbMessageSubscriptionState.put(createLegacyMessageSubscription.getKey(), createLegacyMessageSubscription.getRecord());
        legacyDbMessageSubscriptionState.updateSentTime(createLegacyMessageSubscription, TEST_SENT_TIME);
        LegacyMessageSubscription createLegacyMessageSubscription2 = TestUtilities.createLegacyMessageSubscription(101L, 2L);
        legacyDbMessageSubscriptionState.put(createLegacyMessageSubscription2.getKey(), createLegacyMessageSubscription2.getRecord());
        this.transactionContext.getCurrentTransaction().commit();
        this.processingState.getMigrationState().migrateMessageSubscriptionSentTime(this.processingState.getMessageSubscriptionState(), this.processingState.getPendingMessageSubscriptionState());
        ((AbstractBooleanAssert) Assertions.assertThat(this.zeebeDb.isEmpty(ZbColumnFamilies.MESSAGE_SUBSCRIPTION_BY_SENT_TIME, this.transactionContext)).describedAs("Column family MESSAGE_SUBSCRIPTION_BY_SENT_TIME is empty", new Object[0])).isTrue();
        MutableMessageSubscriptionState messageSubscriptionState = this.processingState.getMessageSubscriptionState();
        MessageSubscription lookupMigratedMessageSubscription = lookupMigratedMessageSubscription(createLegacyMessageSubscription, messageSubscriptionState);
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedMessageSubscription.isCorrelating()).describedAs("Correlating flag", new Object[0])).isTrue();
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedMessageSubscription(createLegacyMessageSubscription2, messageSubscriptionState).isCorrelating()).describedAs("Correlating flag", new Object[0])).isFalse();
        assertThaRecordIsPresentInTransientState(lookupMigratedMessageSubscription.getRecord());
    }

    private MessageSubscription lookupMigratedMessageSubscription(LegacyMessageSubscription legacyMessageSubscription, MutableMessageSubscriptionState mutableMessageSubscriptionState) {
        return mutableMessageSubscriptionState.get(legacyMessageSubscription.getRecord().getElementInstanceKey(), legacyMessageSubscription.getRecord().getMessageNameBuffer());
    }

    private void assertThaRecordIsPresentInTransientState(MessageSubscriptionRecord messageSubscriptionRecord) {
        PendingMessageSubscriptionState pendingMessageSubscriptionState = this.processingState.getPendingMessageSubscriptionState();
        ArrayList arrayList = new ArrayList();
        pendingMessageSubscriptionState.visitPending(1001L, messageSubscription -> {
            arrayList.add(messageSubscription.getRecord());
            return true;
        });
        Assertions.assertThat(arrayList).hasSize(1).containsExactly(new MessageSubscriptionRecord[]{messageSubscriptionRecord});
        pendingMessageSubscriptionState.visitPending(TEST_SENT_TIME, messageSubscription2 -> {
            return ((Boolean) org.junit.jupiter.api.Assertions.fail("Found unexpected subscription " + messageSubscription2)).booleanValue();
        });
    }

    @Test
    public void testMigrateProcessMessageSubscriptionSentTime() {
        LegacyDbProcessMessageSubscriptionState legacyDbProcessMessageSubscriptionState = new LegacyDbProcessMessageSubscriptionState(this.zeebeDb, this.transactionContext);
        MutableProcessMessageSubscriptionState processMessageSubscriptionState = this.processingState.getProcessMessageSubscriptionState();
        LegacyProcessMessageSubscription createLegacyProcessMessageSubscription = TestUtilities.createLegacyProcessMessageSubscription(100L, 1L);
        legacyDbProcessMessageSubscriptionState.put(createLegacyProcessMessageSubscription.getKey(), createLegacyProcessMessageSubscription.getRecord(), TEST_SENT_TIME);
        LegacyProcessMessageSubscription createLegacyProcessMessageSubscription2 = TestUtilities.createLegacyProcessMessageSubscription(101L, 2L);
        legacyDbProcessMessageSubscriptionState.put(createLegacyProcessMessageSubscription2.getKey(), createLegacyProcessMessageSubscription2.getRecord(), TEST_SENT_TIME);
        legacyDbProcessMessageSubscriptionState.updateToOpenedState(createLegacyProcessMessageSubscription2.getRecord());
        LegacyProcessMessageSubscription createLegacyProcessMessageSubscription3 = TestUtilities.createLegacyProcessMessageSubscription(102L, 3L);
        legacyDbProcessMessageSubscriptionState.put(createLegacyProcessMessageSubscription3.getKey(), createLegacyProcessMessageSubscription3.getRecord(), TEST_SENT_TIME);
        legacyDbProcessMessageSubscriptionState.updateToClosingState(createLegacyProcessMessageSubscription3.getRecord(), TEST_SENT_TIME);
        this.processingState.getMigrationState().migrateProcessMessageSubscriptionSentTime(this.processingState.getProcessMessageSubscriptionState(), this.processingState.getPendingProcessMessageSubscriptionState());
        ((AbstractBooleanAssert) Assertions.assertThat(this.zeebeDb.isEmpty(ZbColumnFamilies.PROCESS_SUBSCRIPTION_BY_SENT_TIME, this.transactionContext)).describedAs("Column family PROCESS_SUBSCRIPTION_BY_SENT_TIME is empty", new Object[0])).isTrue();
        ProcessMessageSubscription lookupMigratedProcessMessageSubscription = lookupMigratedProcessMessageSubscription(createLegacyProcessMessageSubscription, processMessageSubscriptionState);
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription.isOpening()).describedAs("Opening subscription - opening flag", new Object[0])).isTrue();
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription.isClosing()).describedAs("Opening subscription - closing flag", new Object[0])).isFalse();
        ProcessMessageSubscription lookupMigratedProcessMessageSubscription2 = lookupMigratedProcessMessageSubscription(createLegacyProcessMessageSubscription2, processMessageSubscriptionState);
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription2.isOpening()).describedAs("Opened subscription - opening flag", new Object[0])).isFalse();
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription2.isClosing()).describedAs("Opened subscription - closing flag", new Object[0])).isFalse();
        ProcessMessageSubscription lookupMigratedProcessMessageSubscription3 = lookupMigratedProcessMessageSubscription(createLegacyProcessMessageSubscription3, processMessageSubscriptionState);
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription3.isOpening()).describedAs("Closing subscription - opening flag", new Object[0])).isFalse();
        ((AbstractBooleanAssert) Assertions.assertThat(lookupMigratedProcessMessageSubscription3.isClosing()).describedAs("Closing subscription - closing flag", new Object[0])).isTrue();
        assertThatRecordsArePresentInTransientState(createLegacyProcessMessageSubscription.getRecord(), createLegacyProcessMessageSubscription3.getRecord());
    }

    private void assertThatRecordsArePresentInTransientState(ProcessMessageSubscriptionRecord... processMessageSubscriptionRecordArr) {
        PendingProcessMessageSubscriptionState pendingProcessMessageSubscriptionState = this.processingState.getPendingProcessMessageSubscriptionState();
        ArrayList arrayList = new ArrayList();
        pendingProcessMessageSubscriptionState.visitPending(1001L, processMessageSubscription -> {
            ProcessMessageSubscriptionRecord processMessageSubscriptionRecord = new ProcessMessageSubscriptionRecord();
            processMessageSubscriptionRecord.wrap(processMessageSubscription.getRecord());
            arrayList.add(processMessageSubscriptionRecord);
            return true;
        });
        Assertions.assertThat(arrayList).hasSize(processMessageSubscriptionRecordArr.length).containsExactlyInAnyOrder(processMessageSubscriptionRecordArr);
        pendingProcessMessageSubscriptionState.visitPending(TEST_SENT_TIME, processMessageSubscription2 -> {
            return ((Boolean) org.junit.jupiter.api.Assertions.fail("Found unexpected subscription " + processMessageSubscription2)).booleanValue();
        });
    }

    private ProcessMessageSubscription lookupMigratedProcessMessageSubscription(LegacyProcessMessageSubscription legacyProcessMessageSubscription, MutableProcessMessageSubscriptionState mutableProcessMessageSubscriptionState) {
        return mutableProcessMessageSubscriptionState.getSubscription(legacyProcessMessageSubscription.getRecord().getElementInstanceKey(), legacyProcessMessageSubscription.getRecord().getMessageNameBuffer());
    }
}
