package io.zeebe.broker.job.old;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.ConfigurationTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ControlMessageRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ControlMessageResponse;
import io.zeebe.test.broker.protocol.clientapi.ErrorResponse;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.broker.protocol.clientapi.TestPartitionClient;
import io.zeebe.test.util.TestUtil;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.StringUtil;
import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/job/old/JobSubscriptionTest.class */
public class JobSubscriptionTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private TestPartitionClient testClient;

    public JobSubscriptionTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void setUp() {
        this.testClient = this.apiRule.partition();
    }

    @Test
    public void shouldAddJobSubscription() {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("worker", TestJarExporter.FOO).put("credits", 5).done()).send();
        ExecuteCommandResponse createJob = this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.ACTIVATED);
        Assertions.assertThat(receiveFirstJobEvent.key()).isEqualTo(createJob.key());
        Assertions.assertThat(receiveFirstJobEvent.position()).isGreaterThan(createJob.position());
        Assertions.assertThat(receiveFirstJobEvent.timestamp()).isGreaterThanOrEqualTo(createJob.timestamp());
        Assertions.assertThat(receiveFirstJobEvent.value()).containsEntry("type", TypedStreamProcessorTest.STREAM_NAME).containsEntry("retries", 3L).containsEntry("worker", TestJarExporter.FOO);
        Assertions.assertThat((List) this.testClient.receiveRecords().ofTypeJob().limit(4L).map(subscribedRecord -> {
            return subscribedRecord.intent();
        }).collect(Collectors.toList())).containsExactly(new Intent[]{JobIntent.CREATE, JobIntent.CREATED, JobIntent.ACTIVATE, JobIntent.ACTIVATED});
    }

    @Test
    public void shouldContainSourceRecordPosition() {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("worker", TestJarExporter.FOO).put("credits", 5).done()).send();
        ExecuteCommandResponse createJob = this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.apiRule.subscribedEvents().findAny().get();
        SubscribedRecord receiveFirstJobCommand = this.testClient.receiveFirstJobCommand(JobIntent.CREATE);
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.CREATED);
        SubscribedRecord receiveFirstJobCommand2 = this.testClient.receiveFirstJobCommand(JobIntent.ACTIVATE);
        SubscribedRecord receiveFirstJobEvent2 = this.testClient.receiveFirstJobEvent(JobIntent.ACTIVATED);
        Assertions.assertThat(createJob.sourceRecordPosition()).isEqualTo(receiveFirstJobCommand.position());
        Assertions.assertThat(receiveFirstJobEvent.sourceRecordPosition()).isEqualTo(receiveFirstJobCommand.position());
        Assertions.assertThat(receiveFirstJobCommand2.sourceRecordPosition()).isEqualTo(receiveFirstJobEvent.position());
        Assertions.assertThat(subscribedRecord.sourceRecordPosition()).isEqualTo(receiveFirstJobCommand2.position());
        Assertions.assertThat(receiveFirstJobEvent2.sourceRecordPosition()).isEqualTo(receiveFirstJobCommand2.position());
    }

    @Test
    public void shouldRemoveJobSubscription() {
        long longValue = ((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue();
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(longValue)).done()).send().await().getData()).containsEntry("subscriberKey", Long.valueOf(longValue));
    }

    @Test
    public void shouldNoLongerActivateJobsAfterRemoval() throws InterruptedException {
        this.apiRule.closeJobSubscription(((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue()).await();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.openTopicSubscription(ConfigurationTest.BROKER_BASE, 0L).await();
        Thread.sleep(500L);
        List list = (List) this.apiRule.subscribedEvents().limit(this.apiRule.numSubscribedEventsAvailable()).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(2);
        Assertions.assertThat(list).allMatch(subscribedRecord -> {
            return subscribedRecord.subscriptionType() == SubscriptionType.TOPIC_SUBSCRIPTION;
        });
        Assertions.assertThat(list).extracting(subscribedRecord2 -> {
            return subscribedRecord2.intent();
        }).containsExactly(new Intent[]{JobIntent.CREATE, JobIntent.CREATED});
    }

    @Test
    public void shouldRejectSubscriptionWithZeroCredits() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("worker", TestJarExporter.FOO).put("credits", 0).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. subscription credits must be greater than 0");
    }

    @Test
    public void shouldRejectSubscriptionWithNegativeCredits() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("worker", TestJarExporter.FOO).put("credits", -1).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. subscription credits must be greater than 0");
    }

    @Test
    public void shouldRejectSubscriptionWithoutWorker() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("credits", 5).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. worker must not be empty");
    }

    @Test
    public void shouldRejectSubscriptionWithExcessiveWorkerName() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 10000L).put("worker", StringUtil.stringOfLength(65)).put("credits", 5).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. length of worker must be less than or equal to 64");
    }

    @Test
    public void shouldRejectSubscriptionWithZeroTimeout() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 0).put("worker", TestJarExporter.FOO).put("credits", 5).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. timeout must be greater than 0");
    }

    @Test
    public void shouldRejectSubscriptionWithNegativeTimeout() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", -1).put("worker", TestJarExporter.FOO).put("credits", 5).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot add job subscription. timeout must be greater than 0");
    }

    @Test
    public void shouldDistributeJobsInRoundRobinFashion() {
        long longValue = ((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue();
        long longValue2 = ((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 4;
        });
        List list = (List) this.apiRule.subscribedEvents().limit(4L).collect(Collectors.toList());
        long subscriberKey = ((SubscribedRecord) list.get(0)).subscriberKey();
        long j = subscriberKey == longValue ? longValue2 : longValue;
        Assertions.assertThat(list).extracting(subscribedRecord -> {
            return Long.valueOf(subscribedRecord.subscriberKey());
        }).containsExactly(new Long[]{Long.valueOf(subscriberKey), Long.valueOf(j), Long.valueOf(subscriberKey), Long.valueOf(j)});
    }

    @Test
    public void shouldCloseSubscriptionOnTransportChannelClose() throws InterruptedException {
        this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        this.apiRule.interruptAllChannels();
        Thread.sleep(1000L);
        ExecuteCommandResponse createJob = this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        long longValue = ((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue();
        Optional findFirst = this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.subscriptionType() == SubscriptionType.JOB_SUBSCRIPTION && subscribedRecord.key() == createJob.key();
        }).findFirst();
        Assertions.assertThat(findFirst).isPresent();
        Assertions.assertThat(((SubscribedRecord) findFirst.get()).subscriberKey()).isEqualTo(longValue);
    }

    @Test
    public void shouldOpenSubscriptionConcurrentWithRemoval() {
        this.apiRule.closeJobSubscription(((Long) this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await().getData().get("subscriberKey")).longValue());
        ControlMessageResponse await = this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        Assertions.assertThat(await.getData()).containsKey("subscriberKey");
        long longValue = ((Long) await.getData().get("subscriberKey")).longValue();
        ExecuteCommandResponse createJob = this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        Optional findFirst = this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.subscriptionType() == SubscriptionType.JOB_SUBSCRIPTION && subscribedRecord.key() == createJob.key();
        }).findFirst();
        Assertions.assertThat(findFirst).isPresent();
        Assertions.assertThat(((SubscribedRecord) findFirst.get()).subscriberKey()).isEqualTo(longValue);
    }

    @Test
    public void shouldContinueRoundRobinJobDistributionAfterClientChannelClose() {
        this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        openAndCloseConnectionTo(this.apiRule.getBrokerAddress());
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        List list = (List) this.apiRule.subscribedEvents().limit(2L).collect(Collectors.toList());
        Assertions.assertThat(((SubscribedRecord) list.get(0)).subscriberKey()).isNotEqualTo(((SubscribedRecord) list.get(1)).subscriberKey());
    }

    protected void openAndCloseConnectionTo(SocketAddress socketAddress) {
        try {
            SocketChannel open = SocketChannel.open();
            open.setOption((SocketOption<SocketOption>) StandardSocketOptions.TCP_NODELAY, (SocketOption) true);
            open.configureBlocking(true);
            open.connect(socketAddress.toInetSocketAddress());
            open.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void shouldRejectCreditsEqualToZero() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.INCREASE_JOB_SUBSCRIPTION_CREDITS).data().put("subscriberKey", 1).put("credits", 0).put("partitionId", Integer.valueOf(this.apiRule.getDefaultPartitionId())).done()).send().awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot increase job subscription credits. Credits must be positive.");
    }

    @Test
    public void shouldRejectNegativeCredits() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.INCREASE_JOB_SUBSCRIPTION_CREDITS).data().put("subscriberKey", 1).put("credits", -10).put("partitionId", Integer.valueOf(this.apiRule.getDefaultPartitionId())).done()).send().awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot increase job subscription credits. Credits must be positive.");
    }

    @Test
    public void shouldAddJobSubscriptionsForDifferentTypes() {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 1000L).put("worker", "owner1").put("credits", 5).done()).send();
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TestJarExporter.FOO).put("timeout", 1000L).put("worker", "owner2").put("credits", 5).done()).send();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TestJarExporter.FOO);
        List list = (List) this.testClient.receiveEvents().ofTypeJob().withIntent(JobIntent.ACTIVATED).limit(2L).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(2);
        Function function = str -> {
            return list.stream().filter(subscribedRecord -> {
                return str.equals(subscribedRecord.value().get("type"));
            }).findFirst();
        };
        Assertions.assertThat((Optional) function.apply(TypedStreamProcessorTest.STREAM_NAME)).hasValueSatisfying(subscribedRecord -> {
            Assertions.assertThat(subscribedRecord.value()).containsEntry("worker", "owner1");
        });
        Assertions.assertThat((Optional) function.apply(TestJarExporter.FOO)).hasValueSatisfying(subscribedRecord2 -> {
            Assertions.assertThat(subscribedRecord2.value()).containsEntry("worker", "owner2");
        });
    }

    @Test
    public void shouldActivateJobsUntilCreditsAreExhausted() {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 1000L).put("worker", TestJarExporter.FOO).put("credits", 2).done()).send();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        Assertions.assertThat((List) this.testClient.receiveEvents().ofTypeJob().withIntent(JobIntent.ACTIVATED).limit(2L).collect(Collectors.toList())).extracting(subscribedRecord -> {
            return subscribedRecord.value().get("worker");
        }).contains(new Object[]{TestJarExporter.FOO});
    }

    @Test
    public void shouldIncreaseSubscriptionCredits() {
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.ADD_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("jobType", TypedStreamProcessorTest.STREAM_NAME).put("timeout", 1000L).put("worker", TestJarExporter.FOO).put("credits", 2).done()).sendAndAwait().getData().get("subscriberKey")).isEqualTo(0L);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.INCREASE_JOB_SUBSCRIPTION_CREDITS).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", 0).put("credits", 2).put("partitionId", Integer.valueOf(this.apiRule.getDefaultPartitionId())).done()).send();
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 4;
        });
    }

    @Test
    public void shouldIgnoreCreditsRequestIfSubscriptionDoesNotExist() {
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.INCREASE_JOB_SUBSCRIPTION_CREDITS).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", 444L).put("credits", 2).put("partitionId", Integer.valueOf(this.apiRule.getDefaultPartitionId())).done()).sendAndAwait().getData()).containsEntry("subscriberKey", 444L);
    }

    @Test
    public void shouldNotPublishJobWithoutRetries() throws InterruptedException {
        this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        this.testClient.failJob(((SubscribedRecord) this.apiRule.subscribedEvents().findFirst().get()).key(), 0);
        Thread.sleep(500L);
        Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(0);
    }

    @Test
    public void shouldNotPublishJobOfDifferentType() throws InterruptedException {
        this.apiRule.openJobSubscription(TypedStreamProcessorTest.STREAM_NAME).await();
        this.testClient.createJob(TestJarExporter.FOO);
        Thread.sleep(500L);
        Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(0);
    }

    @Test
    public void shouldPublishJobsToSecondSubscription() {
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10000L, 2).await();
        for (int i = 0; i < 2; i++) {
            this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        }
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        this.apiRule.moveMessageStreamToTail();
        long longValue = ((Long) this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10000L, 2).await().getData().get("subscriberKey")).longValue();
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        Assertions.assertThat(((SubscribedRecord) this.apiRule.subscribedEvents().findFirst().get()).subscriberKey()).isEqualTo(longValue);
    }

    @Test
    public void shouldActivateJobsOfDifferentTypeLocatedInFrontOfAlreadyActivatedJob() {
        this.testClient.createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.testClient.createJob(TestJarExporter.FOO);
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TestJarExporter.FOO, 10000L, 32).await();
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 1;
        });
        this.apiRule.openJobSubscription(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10000L, 32).await();
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 2;
        });
        Assertions.assertThat(((SubscribedRecord) this.apiRule.subscribedEvents().skip(1L).findFirst().get()).value().get("type")).isEqualTo(TypedStreamProcessorTest.STREAM_NAME);
    }

    @Test
    public void shouldRejectRemovingNonExistingJobSubscription() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_JOB_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", 123).done()).send().awaitError();
        Assertions.assertThat(awaitError).isNotNull();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot remove job subscription. Subscription does not exist");
    }
}
