package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.MessageBatchIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/ExpireMessageTest.class */
public final class ExpireMessageTest {

    @ClassRule
    public static final EngineRule ENGINE_RULE = EngineRule.singlePartition();

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private PublishMessageClient messageClient;

    @Before
    public void init() {
        this.messageClient = ENGINE_RULE.message().withCorrelationKey("order-123").withName("order canceled").withTimeToLive(1000L);
    }

    @Test
    public void shouldExpireMessageAfterTTL() {
        List of = List.of(Long.valueOf(this.messageClient.withTimeToLive(100L).publish().getKey()), Long.valueOf(this.messageClient.withTimeToLive(100L).withName("order shipped").publish().getKey()));
        ENGINE_RULE.increaseTime(EngineConfiguration.DEFAULT_MESSAGES_TTL_CHECKER_INTERVAL);
        Assertions.assertThat(((Record) RecordingExporter.messageBatchRecords().withIntent(MessageBatchIntent.EXPIRE).getFirst()).getValue()).hasMessageKeys(of);
        org.assertj.core.api.Assertions.assertThat((List) RecordingExporter.messageRecords().withIntent(MessageIntent.EXPIRED).flatMapToLong(record -> {
            return LongStream.of(record.getKey());
        }).boxed().collect(Collectors.toList())).isEqualTo(of);
    }

    @Test
    public void shouldExpireMessageImmediatelyWithZeroTTL() {
        Assertions.assertThat(((Record) RecordingExporter.messageRecords().withIntent(MessageIntent.EXPIRED).withRecordKey(this.messageClient.withTimeToLive(0L).publish().getKey()).getFirst()).getValue()).hasName("order canceled").hasCorrelationKey("order-123").hasTimeToLive(0L).hasMessageId("");
    }

    @Test
    public void shouldHaveNoSourceRecordPositionOnExpire() {
        Record<MessageRecordValue> publish = this.messageClient.withTimeToLive(50L).publish();
        ENGINE_RULE.increaseTime(EngineConfiguration.DEFAULT_MESSAGES_TTL_CHECKER_INTERVAL);
        org.assertj.core.api.Assertions.assertThat(((Record) RecordingExporter.messageBatchRecords().withIntent(MessageBatchIntent.EXPIRE).hasMessageKey(publish.getKey()).getFirst()).getSourceRecordPosition()).isLessThan(0L);
    }
}
