/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PublisherImpl;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.protobuf.ByteString;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableList;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.verification.VerificationMode;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class PublisherImplTest {
    private static final PublishRequest INITIAL_PUBLISH_REQUEST = PublishRequest.newBuilder().setInitialRequest(InitialPublishRequest.newBuilder().setPartition(7L).build()).build();
    private static final BatchingSettings BATCHING_SETTINGS_THAT_NEVER_FIRE = BatchingSettings.newBuilder().setIsEnabled(Boolean.valueOf(true)).setDelayThreshold(Duration.ofDays((long)10L)).setRequestByteThreshold(Long.valueOf(1000000L)).setElementCountThreshold(Long.valueOf(1000000L)).build();
    @Mock
    private StreamFactories.PublishStreamFactory unusedStreamFactory;
    @Mock
    private BatchPublisher mockBatchPublisher;
    @Mock
    private BatchPublisherFactory mockPublisherFactory;
    @Mock
    private AlarmFactory alarmFactory;
    private PublisherImpl publisher;
    private Future<Void> errorOccurredFuture;
    private ResponseObserver<Offset> leakedOffsetStream;
    private Runnable leakedBatchAlarm;

    @Before
    public void setUp() throws CheckedApiException {
        MockitoAnnotations.initMocks((Object)this);
        ((BatchPublisherFactory)Mockito.doAnswer(args -> {
            this.leakedOffsetStream = (ResponseObserver)args.getArgument(1);
            return this.mockBatchPublisher;
        }).when((Object)this.mockPublisherFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
        Mockito.when((Object)this.alarmFactory.newAlarm((Runnable)ArgumentMatchers.any())).thenAnswer(args -> {
            this.leakedBatchAlarm = (Runnable)args.getArgument(0);
            return SettableApiFuture.create();
        });
        this.publisher = new PublisherImpl(this.unusedStreamFactory, this.mockPublisherFactory, this.alarmFactory, INITIAL_PUBLISH_REQUEST.getInitialRequest(), BATCHING_SETTINGS_THAT_NEVER_FIRE);
        this.errorOccurredFuture = RetryingConnectionHelpers.whenFailed((ApiService)this.publisher);
    }

    private void startPublisher() {
        this.publisher.startAsync().awaitRunning();
        Truth.assertThat(this.leakedOffsetStream).isNotNull();
        ((BatchPublisherFactory)Mockito.verify((Object)this.mockPublisherFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
    }

    @Test
    public void construct_CallsFactoryNew() {
        this.startPublisher();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockPublisherFactory});
        Mockito.verifyNoInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void construct_FlushSendsBatched() throws Exception {
        this.startPublisher();
        Message message = Message.builder().build();
        ApiFuture future = this.publisher.publish(message);
        ((BatchPublisher)Mockito.doAnswer(args -> {
            this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
            return null;
        }).when((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message.toProto()})));
        this.publisher.flush();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message.toProto()})));
        Truth.assertThat((Comparable)((Comparable)future.get())).isEqualTo((Object)Offset.of((long)10L));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void construct_CloseSendsBatched() throws Exception {
        this.startPublisher();
        Message message = Message.builder().build();
        ApiFuture future = this.publisher.publish(message);
        ((BatchPublisher)Mockito.doAnswer(args -> {
            this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
            return null;
        }).when((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message.toProto()})));
        this.publisher.stopAsync().awaitTerminated();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message.toProto()})));
        Truth.assertThat((Comparable)((Comparable)future.get())).isEqualTo((Object)Offset.of((long)10L));
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void publishBeforeStart_IsPermanentError() throws Exception {
        Message message = Message.builder().build();
        Assert.assertThrows(IllegalStateException.class, () -> this.publisher.publish(message));
        Assert.assertThrows(IllegalStateException.class, () -> this.publisher.startAsync().awaitRunning());
        Mockito.verifyNoInteractions((Object[])new Object[]{this.mockPublisherFactory});
        Mockito.verifyNoInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void publishAfterError_IsError() throws Exception {
        this.startPublisher();
        this.leakedOffsetStream.onError((Throwable)new CheckedApiException((StatusCode.Code)StatusCode.Code.FAILED_PRECONDITION).underlying);
        Assert.assertThrows(IllegalStateException.class, () -> ((PublisherImpl)this.publisher).awaitTerminated());
        this.errorOccurredFuture.get();
        ApiExceptionMatcher.assertThrowableMatches(this.publisher.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
        Message message = Message.builder().build();
        ApiFuture future = this.publisher.publish(message);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, ((Future)future)::get);
        Optional statusOr = ExtractStatus.extract((Throwable)e.getCause());
        Truth.assertThat((Boolean)statusOr.isPresent()).isTrue();
        Truth.assertThat((Comparable)((CheckedApiException)statusOr.get()).code()).isEqualTo((Object)StatusCode.Code.FAILED_PRECONDITION);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void multipleBatches_Ok() throws Exception {
        this.startPublisher();
        Message message1 = Message.builder().build();
        Message message2 = Message.builder().setData(ByteString.copyFromUtf8((String)"data")).build();
        Message message3 = Message.builder().setData(ByteString.copyFromUtf8((String)"other_data")).build();
        ApiFuture future1 = this.publisher.publish(message1);
        ApiFuture future2 = this.publisher.publish(message2);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto(), message2.toProto()})));
        ApiFuture future3 = this.publisher.publish(message3);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message3.toProto()})));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future1.get())).isEqualTo((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future2.get())).isEqualTo((Object)Offset.of((long)11L));
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)12L));
        Truth.assertThat((Boolean)future3.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future3.get())).isEqualTo((Object)Offset.of((long)12L));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void retryableError_RecreatesAndRetriesAll() throws Exception {
        this.startPublisher();
        Message message1 = Message.builder().setData(ByteString.copyFrom((byte[])new byte[3669996])).build();
        Message message2 = Message.builder().setData(ByteString.copyFromUtf8((String)String.join((CharSequence)"", Collections.nCopies(21, "a")))).build();
        ApiFuture future1 = this.publisher.publish(message1);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto()})));
        this.leakedBatchAlarm.run();
        ApiFuture future2 = this.publisher.publish(message2);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message2.toProto()})));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        BatchPublisher mockBatchPublisher2 = (BatchPublisher)Mockito.mock(BatchPublisher.class);
        ((BatchPublisherFactory)Mockito.doReturn((Object)mockBatchPublisher2).when((Object)this.mockPublisherFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
        this.leakedOffsetStream.onError((Throwable)new CheckedApiException(StatusCode.Code.UNKNOWN));
        Thread.sleep(500L);
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
        ((BatchPublisherFactory)Mockito.verify((Object)this.mockPublisherFactory, (VerificationMode)Mockito.times((int)2))).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
        ((BatchPublisher)Mockito.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto()})));
        ((BatchPublisher)Mockito.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message2.toProto()})));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future1.get())).isEqualTo((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)50L));
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future2.get())).isEqualTo((Object)Offset.of((long)50L));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher, mockBatchPublisher2});
    }

    @Test
    public void retryableError_RebatchesProperly() throws Exception {
        this.startPublisher();
        Message message1 = Message.builder().setData(ByteString.copyFromUtf8((String)"message1")).build();
        Message message2 = Message.builder().setData(ByteString.copyFromUtf8((String)"message2")).build();
        Message message3 = Message.builder().setData(ByteString.copyFrom((byte[])new byte[3669996])).build();
        Message message4 = Message.builder().setData(ByteString.copyFromUtf8((String)String.join((CharSequence)"", Collections.nCopies(21, "a")))).build();
        List remaining = IntStream.range(0, 1000).mapToObj(x -> Message.builder().setData(ByteString.copyFromUtf8((String)("clone-" + x))).build()).collect(Collectors.toList());
        ApiFuture future1 = this.publisher.publish(message1);
        ApiFuture future2 = this.publisher.publish(message2);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto(), message2.toProto()})));
        this.leakedBatchAlarm.run();
        ApiFuture future3 = this.publisher.publish(message3);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message3.toProto()})));
        ApiFuture future4 = this.publisher.publish(message4);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message4.toProto()})));
        List remainingFutures = remaining.stream().map(arg_0 -> ((PublisherImpl)this.publisher).publish(arg_0)).collect(Collectors.toList());
        this.leakedBatchAlarm.run();
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        Truth.assertThat((Boolean)future4.isDone()).isFalse();
        for (Future future : remainingFutures) {
            Truth.assertThat((Boolean)future.isDone()).isFalse();
        }
        BatchPublisher mockBatchPublisher2 = (BatchPublisher)Mockito.mock(BatchPublisher.class);
        ((BatchPublisherFactory)Mockito.doReturn((Object)mockBatchPublisher2).when((Object)this.mockPublisherFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
        this.leakedOffsetStream.onError((Throwable)new CheckedApiException(StatusCode.Code.UNKNOWN));
        Thread.sleep(500L);
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).close();
        ((BatchPublisherFactory)Mockito.verify((Object)this.mockPublisherFactory, (VerificationMode)Mockito.times((int)2))).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)INITIAL_PUBLISH_REQUEST));
        InOrder order = Mockito.inOrder((Object[])new Object[]{mockBatchPublisher2});
        ((BatchPublisher)order.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto(), message2.toProto()})));
        ((BatchPublisher)order.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message3.toProto()})));
        ImmutableList.Builder expectedRebatch = ImmutableList.builder();
        expectedRebatch.add((Object)message4.toProto());
        int i = 0;
        while ((long)i < 999L) {
            expectedRebatch.add((Object)((Message)remaining.get(i)).toProto());
            ++i;
        }
        ((BatchPublisher)order.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)Matchers.contains((Object[])expectedRebatch.build().toArray())));
        ((BatchPublisher)order.verify((Object)mockBatchPublisher2)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{((Message)Iterables.getLast(remaining)).toProto()})));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        Truth.assertThat((Boolean)future4.isDone()).isFalse();
        for (Future future : remainingFutures) {
            Truth.assertThat((Boolean)future.isDone()).isFalse();
        }
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future1.get())).isEqualTo((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future2.get())).isEqualTo((Object)Offset.of((long)11L));
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)50L));
        Truth.assertThat((Boolean)future3.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future3.get())).isEqualTo((Object)Offset.of((long)50L));
        Truth.assertThat((Boolean)future4.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)100L));
        Truth.assertThat((Boolean)future4.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future4.get())).isEqualTo((Object)Offset.of((long)100L));
        i = 0;
        while ((long)i < 999L) {
            Future future;
            future = (Future)remainingFutures.get(i);
            Truth.assertThat((Boolean)future.isDone()).isTrue();
            Truth.assertThat((Comparable)((Comparable)future.get())).isEqualTo((Object)Offset.of((long)(101 + i)));
            ++i;
        }
        Future lastFuture = (Future)Iterables.getLast(remainingFutures);
        Truth.assertThat((Boolean)lastFuture.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)10000L));
        Truth.assertThat((Boolean)lastFuture.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)lastFuture.get())).isEqualTo((Object)Offset.of((long)10000L));
    }

    @Test
    public void invalidOffsetSequence_SetsPermanentException() throws Exception {
        this.startPublisher();
        Message message1 = Message.builder().build();
        Message message2 = Message.builder().setData(ByteString.copyFromUtf8((String)"data")).build();
        Message message3 = Message.builder().setData(ByteString.copyFromUtf8((String)"other_data")).build();
        ApiFuture future1 = this.publisher.publish(message1);
        ApiFuture future2 = this.publisher.publish(message2);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto(), message2.toProto()})));
        ApiFuture future3 = this.publisher.publish(message3);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message3.toProto()})));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future1.get())).isEqualTo((Object)Offset.of((long)10L));
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        Truth.assertThat((Comparable)((Comparable)future2.get())).isEqualTo((Object)Offset.of((long)11L));
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedOffsetStream.onResponse((Object)Offset.of((long)11L));
        Assert.assertThrows(IllegalStateException.class, () -> ((PublisherImpl)this.publisher).awaitTerminated());
        Truth.assertThat((Boolean)future3.isDone()).isTrue();
        Assert.assertThrows(Exception.class, ((Future)future3)::get);
        this.errorOccurredFuture.get();
        ApiExceptionMatcher.assertThrowableMatches(this.publisher.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockBatchPublisher});
    }

    @Test
    public void cancelOutstandingPublishes_terminatesFutures() throws Exception {
        this.startPublisher();
        Message message1 = Message.builder().setData(ByteString.copyFromUtf8((String)"data")).build();
        ApiFuture future1 = this.publisher.publish(message1);
        this.leakedBatchAlarm.run();
        ((BatchPublisher)Mockito.verify((Object)this.mockBatchPublisher)).publish((Collection)MockitoHamcrest.argThat((Matcher)CoreMatchers.hasItems((Object[])new PubSubMessage[]{message1.toProto()})));
        Message message2 = Message.builder().setData(ByteString.copyFromUtf8((String)"other_data")).build();
        ApiFuture future2 = this.publisher.publish(message2);
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        this.publisher.cancelOutstandingPublishes();
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        ExecutionException e1 = (ExecutionException)Assert.assertThrows(ExecutionException.class, ((Future)future1)::get);
        Truth.assertThat((Comparable)((CheckedApiException)ExtractStatus.extract((Throwable)e1.getCause()).get()).code()).isEqualTo((Object)StatusCode.Code.CANCELLED);
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        ExecutionException e2 = (ExecutionException)Assert.assertThrows(ExecutionException.class, ((Future)future2)::get);
        Truth.assertThat((Comparable)((CheckedApiException)ExtractStatus.extract((Throwable)e2.getCause()).get()).code()).isEqualTo((Object)StatusCode.Code.CANCELLED);
    }
}

