package io.zeebe.broker.job;

import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.broker.protocol.clientapi.TestPartitionClient;
import io.zeebe.test.util.TestUtil;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/job/JobTimeOutTest.class */
public class JobTimeOutTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;

    public JobTimeOutTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Test
    public void shouldNotTimeOutIfDeadlineNotExceeded() throws InterruptedException {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        this.apiRule.subscribedEvents().findFirst().get();
        this.brokerRule.getClock().addTime(ofSeconds.minus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived();
    }

    @Test
    public void shouldNotTimeOutIfJobCompleted() throws InterruptedException {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        completeJob((SubscribedRecord) this.apiRule.subscribedEvents().findFirst().get());
        this.brokerRule.getClock().addTime(ofSeconds.plus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived();
    }

    @Test
    public void shouldNotTimeOutIfJobFailed() {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.apiRule.subscribedEvents().findFirst().get();
        HashMap hashMap = new HashMap(subscribedRecord.value());
        hashMap.put("retries", 0);
        failJob(subscribedRecord.key(), hashMap);
        this.brokerRule.getClock().addTime(ofSeconds.plus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived();
    }

    @Test
    public void shouldTimeOutJob() {
        long createJob = createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 1000L);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        this.apiRule.moveMessageStreamToTail();
        TestUtil.doRepeatedly(() -> {
            this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        }).until(r4 -> {
            return Boolean.valueOf(this.apiRule.numSubscribedEventsAvailable() == 1);
        });
        List list = (List) this.apiRule.partition().receiveRecords().ofTypeJob().limit(8L).collect(Collectors.toList());
        Assertions.assertThat(list).extracting(subscribedRecord -> {
            return Long.valueOf(subscribedRecord.key());
        }).contains(new Long[]{Long.valueOf(createJob)});
        Assertions.assertThat(list).extracting(subscribedRecord2 -> {
            return subscribedRecord2.intent();
        }).containsExactly(new Intent[]{JobIntent.CREATE, JobIntent.CREATED, JobIntent.ACTIVATE, JobIntent.ACTIVATED, JobIntent.TIME_OUT, JobIntent.TIMED_OUT, JobIntent.ACTIVATE, JobIntent.ACTIVATED});
    }

    @Test
    public void shouldSetCorrectSourcePositionAfterJobTimeOut() {
        createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 1000L);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        this.apiRule.moveMessageStreamToTail();
        TestUtil.doRepeatedly(() -> {
            this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        }).until(r4 -> {
            return Boolean.valueOf(this.apiRule.numSubscribedEventsAvailable() == 1);
        });
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.apiRule.subscribedEvents().findAny().get();
        TestPartitionClient partition = this.apiRule.partition();
        SubscribedRecord subscribedRecord2 = (SubscribedRecord) partition.receiveRecords().ofTypeJob().withIntent(JobIntent.ACTIVATE).findFirst().get();
        Assertions.assertThat(subscribedRecord.sourceRecordPosition()).isNotEqualTo(subscribedRecord2.position());
        Assertions.assertThat(subscribedRecord.sourceRecordPosition()).isEqualTo(((SubscribedRecord) partition.receiveRecords().ofTypeJob().withIntent(JobIntent.ACTIVATE).skipUntil(subscribedRecord3 -> {
            return subscribedRecord3.position() > subscribedRecord2.position();
        }).findFirst().get()).position());
    }

    @Test
    public void shouldExpireMultipleActivatedJobsAtOnce() {
        long createJob = createJob(TypedStreamProcessorTest.STREAM_NAME);
        long createJob2 = createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 1000L);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        this.apiRule.moveMessageStreamToTail();
        TestUtil.doRepeatedly(() -> {
            this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        }).until(r4 -> {
            return Boolean.valueOf(this.apiRule.numSubscribedEventsAvailable() == 2);
        });
        List list = (List) this.apiRule.partition().receiveRecords().ofTypeJob().limit(16L).collect(Collectors.toList());
        Assertions.assertThat(list).filteredOn(subscribedRecord -> {
            return subscribedRecord.intent() == JobIntent.ACTIVATED;
        }).hasSize(4).extracting(subscribedRecord2 -> {
            return Long.valueOf(subscribedRecord2.key());
        }).containsExactlyInAnyOrder(new Long[]{Long.valueOf(createJob), Long.valueOf(createJob2), Long.valueOf(createJob), Long.valueOf(createJob2)});
        Assertions.assertThat(list).filteredOn(subscribedRecord3 -> {
            return subscribedRecord3.intent() == JobIntent.TIMED_OUT;
        }).extracting(subscribedRecord4 -> {
            return Long.valueOf(subscribedRecord4.key());
        }).containsExactlyInAnyOrder(new Long[]{Long.valueOf(createJob), Long.valueOf(createJob2)});
    }

    private long createJob(String str) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", str).put("retries", 3).done()).sendAndAwait().key();
    }

    private void completeJob(SubscribedRecord subscribedRecord) {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.COMPLETE).key(subscribedRecord.key()).command().putAll(subscribedRecord.value()).done()).sendAndAwait();
    }

    private void failJob(long j, Map<String, Object> map) {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.FAIL).key(j).command().putAll(map).done()).sendAndAwait();
    }

    private void assertNoMoreJobsReceived() {
        try {
            Thread.sleep(1000L);
            Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(0);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
