/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageBatchIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.value.MessageBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.test.util.record.MessageBatchRecordStream;
import io.camunda.zeebe.test.util.record.MessageRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class ExpireMessageTest {
    @ClassRule
    public static final EngineRule ENGINE_RULE = EngineRule.singlePartition();
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private PublishMessageClient messageClient;

    @Before
    public void init() {
        this.messageClient = ENGINE_RULE.message().withCorrelationKey("order-123").withName("order canceled").withTimeToLive(1000L);
    }

    @Test
    public void shouldExpireMessageAfterTTL() {
        long timeToLive = 100L;
        Record<MessageRecordValue> publishedRecord = this.messageClient.withTimeToLive(100L).publish();
        Record<MessageRecordValue> secondPublishedRecord = this.messageClient.withTimeToLive(100L).withName("order shipped").publish();
        List<Long> publishedMessageKeys = List.of(Long.valueOf(publishedRecord.getKey()), Long.valueOf(secondPublishedRecord.getKey()));
        ENGINE_RULE.increaseTime(EngineConfiguration.DEFAULT_MESSAGES_TTL_CHECKER_INTERVAL);
        Record expireBatchMessageCommand = (Record)((MessageBatchRecordStream)RecordingExporter.messageBatchRecords().withIntent((Intent)MessageBatchIntent.EXPIRE)).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageBatchRecordValue)((MessageBatchRecordValue)expireBatchMessageCommand.getValue())).hasMessageKeys(publishedMessageKeys);
        List listOfExpiredMessageKeys = ((MessageRecordStream)RecordingExporter.messageRecords().withIntent((Intent)MessageIntent.EXPIRED)).flatMapToLong(v -> LongStream.of(v.getKey())).boxed().collect(Collectors.toList());
        Assertions.assertThat(listOfExpiredMessageKeys).isEqualTo(publishedMessageKeys);
    }

    @Test
    public void shouldExpireMessageImmediatelyWithZeroTTL() {
        long timeToLive = 0L;
        Record<MessageRecordValue> publishedRecord = this.messageClient.withTimeToLive(0L).publish();
        Record deletedEvent = (Record)((MessageRecordStream)((MessageRecordStream)RecordingExporter.messageRecords().withIntent((Intent)MessageIntent.EXPIRED)).withRecordKey(publishedRecord.getKey())).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageRecordValue)((MessageRecordValue)deletedEvent.getValue())).hasName("order canceled").hasCorrelationKey("order-123").hasTimeToLive(0L).hasMessageId("");
    }

    @Test
    public void shouldHaveNoSourceRecordPositionOnExpire() {
        long timeToLive = 50L;
        Record<MessageRecordValue> publishedRecord = this.messageClient.withTimeToLive(50L).publish();
        ENGINE_RULE.increaseTime(EngineConfiguration.DEFAULT_MESSAGES_TTL_CHECKER_INTERVAL);
        Record deleteCommand = (Record)((MessageBatchRecordStream)RecordingExporter.messageBatchRecords().withIntent((Intent)MessageBatchIntent.EXPIRE)).hasMessageKey(publishedRecord.getKey()).getFirst();
        Assertions.assertThat((long)deleteCommand.getSourceRecordPosition()).isLessThan(0L);
    }
}

