/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.ReassignmentHandler;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.AssigningSubscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
import cz.o2.proxima.beam.io.pubsub.io.grpc.Status;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableSet;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.Set;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(value=JUnit4.class)
public class AssigningSubscriberTest {
    @Mock
    PartitionSubscriberFactory subscriberFactory;
    @Mock
    ReassignmentHandler reassignmentHandler;
    @Mock
    AssignerFactory assignerFactory;
    private AssigningSubscriber assigningSubscriber;
    @Spy
    FakeAssigner assigner;
    private PartitionAssignmentReceiver leakedReceiver;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.assignerFactory.New((PartitionAssignmentReceiver)ArgumentMatchers.any())).then(args -> {
            this.leakedReceiver = (PartitionAssignmentReceiver)args.getArgument(0);
            return this.assigner;
        });
        this.assigningSubscriber = new AssigningSubscriber(this.subscriberFactory, this.reassignmentHandler, this.assignerFactory);
        ((AssignerFactory)Mockito.verify((Object)this.assignerFactory)).New((PartitionAssignmentReceiver)ArgumentMatchers.any());
        Truth.assertThat((Object)this.leakedReceiver).isNotNull();
        this.assigningSubscriber.startAsync().awaitRunning();
    }

    @Test
    public void startStop() {
        this.assigningSubscriber.stopAsync().awaitTerminated();
        ((FakeAssigner)((Object)Mockito.verify((Object)((Object)this.assigner)))).stopAsync();
    }

    @Test
    public void failedCreate() throws CheckedApiException {
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenThrow(new Throwable[]{new RuntimeException("Arbitrary error.")});
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        ((PartitionSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)1L));
        Assert.assertThrows(IllegalStateException.class, () -> ((AssigningSubscriber)this.assigningSubscriber).awaitTerminated());
    }

    @Test
    public void assignmentHandlerFailure() throws Exception {
        Subscriber sub1 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenReturn((Object)sub1);
        ((ReassignmentHandler)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Arbitrary error.")}).when((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of(), (Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        Assert.assertThrows(IllegalStateException.class, () -> ((AssigningSubscriber)this.assigningSubscriber).awaitTerminated());
    }

    @Test
    public void assignmentHandlerReentrantSafe() throws Exception {
        Subscriber sub1 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenReturn((Object)sub1);
        ((ReassignmentHandler)Mockito.doAnswer(args -> {
            this.assigningSubscriber.stopAsync().awaitTerminated();
            return null;
        }).when((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of(), (Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        this.assigningSubscriber.awaitTerminated();
    }

    @Test
    public void createSubscribers() throws CheckedApiException {
        Subscriber sub1 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenReturn((Object)sub1);
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.reassignmentHandler, this.subscriberFactory});
        ((PartitionSubscriberFactory)order.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)1L));
        ((ReassignmentHandler)order.verify((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of(), (Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        ((Subscriber)Mockito.verify((Object)sub1)).startAsync();
        Mockito.reset((Object[])new Subscriber[]{sub1});
        Subscriber sub2 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)2L))).thenReturn((Object)sub2);
        ImmutableSet newAssignment = ImmutableSet.of((Object)Partition.of((long)1L), (Object)Partition.of((long)2L));
        this.leakedReceiver.handleAssignment((Set)newAssignment);
        ((PartitionSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)2L));
        ((ReassignmentHandler)Mockito.verify((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)), (Set)newAssignment);
        ((Subscriber)Mockito.verify((Object)sub2)).startAsync();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{sub1});
    }

    @Test
    public void createAndEvict() throws CheckedApiException {
        Subscriber sub1 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenReturn((Object)sub1);
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        ((PartitionSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)1L));
        ((ReassignmentHandler)Mockito.verify((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of(), (Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        ((Subscriber)Mockito.verify((Object)sub1)).startAsync();
        ApiService.State sub1State = sub1.state();
        Truth.assertThat((Boolean)(ApiService.State.STARTING.equals((Object)sub1State) || ApiService.State.RUNNING.equals((Object)sub1State) ? 1 : 0)).isTrue();
        Mockito.reset((Object[])new Subscriber[]{sub1});
        Subscriber sub2 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)2L))).thenReturn((Object)sub2);
        ((ReassignmentHandler)Mockito.doAnswer(args -> {
            Truth.assertThat((Comparable)sub1.state()).isEqualTo((Object)ApiService.State.TERMINATED);
            return null;
        }).when((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)), (Set)ImmutableSet.of((Object)Partition.of((long)2L)));
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)2L)));
        ((PartitionSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)2L));
        ((ReassignmentHandler)Mockito.verify((Object)this.reassignmentHandler)).handleReassignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)), (Set)ImmutableSet.of((Object)Partition.of((long)2L)));
        ((Subscriber)Mockito.verify((Object)sub2)).startAsync();
        ((Subscriber)Mockito.verify((Object)sub1)).stopAsync();
        ((Subscriber)Mockito.verify((Object)sub1)).awaitTerminated();
    }

    private Subscriber initSub1() throws CheckedApiException {
        Subscriber sub1 = (Subscriber)Mockito.spy(FakeSubscriber.class);
        Mockito.when((Object)this.subscriberFactory.newSubscriber(Partition.of((long)1L))).thenReturn((Object)sub1);
        this.leakedReceiver.handleAssignment((Set)ImmutableSet.of((Object)Partition.of((long)1L)));
        ((PartitionSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber(Partition.of((long)1L));
        ((Subscriber)Mockito.verify((Object)sub1)).startAsync();
        Mockito.reset((Object[])new Subscriber[]{sub1});
        return sub1;
    }

    @Test
    public void stopStopsSubs() throws CheckedApiException {
        Subscriber sub1 = this.initSub1();
        this.assigningSubscriber.stopAsync();
        ((Subscriber)Mockito.verify((Object)sub1)).stopAsync();
        ((Subscriber)Mockito.verify((Object)sub1)).awaitTerminated();
    }

    @Test
    public void assignerErrorFailsSubs() throws Exception {
        Subscriber sub1 = this.initSub1();
        Future<Void> terminated = RetryingConnectionHelpers.whenTerminated((ApiService)sub1);
        this.assigner.fail((Throwable)Status.INVALID_ARGUMENT.asException());
        terminated.get();
        ((Subscriber)Mockito.verify((Object)sub1)).stopAsync();
        ((Subscriber)Mockito.verify((Object)sub1)).awaitTerminated();
    }

    static abstract class FakeSubscriber
    extends FakeApiService
    implements Subscriber {
        FakeSubscriber() {
        }
    }

    static abstract class FakeAssigner
    extends FakeApiService
    implements Assigner {
        FakeAssigner() {
        }
    }
}

