package cz.o2.proxima.direct.pubsub;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.direct.pubsub.PubSubReader;
import cz.o2.proxima.direct.time.UnboundedOutOfOrdernessWatermarkEstimator;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.MessageReceiver;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.Subscriber;
import cz.o2.proxima.pubsub.shaded.com.google.common.collect.Sets;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.ProjectSubscriptionName;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.PubsubMessage;
import cz.o2.proxima.pubsub.shaded.io.grpc.internal.GrpcUtil;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorImpl;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReaderTest.class */
public class PubSubReaderTest {
    private final PubSubAccessor accessor;
    private static final AtomicLong timestampSupplier = new AtomicLong();
    private TestPubSubReader reader;
    private final Repository repo = Repository.of(ConfigFactory.load().resolve());
    private final DirectDataOperator direct = this.repo.asDataOperator(DirectDataOperator.class, new Consumer[]{directDataOperator -> {
        directDataOperator.withExecutorFactory(Executors::newCachedThreadPool);
    }});
    private final Context context = this.direct.getContext();
    private final PubSubStorage storage = new PubSubStorage();
    private final AttributeDescriptorImpl<?> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("attr").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorImpl<?> wildcard = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("wildcard.*").setSchemeUri(new URI("bytes:///")).build();
    private final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).addAttribute(this.wildcard).build();

    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReaderTest$TestPubSubReader.class */
    public class TestPubSubReader extends PubSubReader {
        private final Context context;
        private final Set<Integer> acked;
        private final Set<Integer> nacked;
        private Supplier<PubsubMessage> supplier;

        public TestPubSubReader(Context context) {
            super(PubSubReaderTest.this.accessor, context);
            this.acked = new HashSet();
            this.nacked = new HashSet();
            this.context = context;
        }

        void setSupplier(Supplier<PubsubMessage> supplier) {
            this.supplier = supplier;
        }

        Subscriber newSubscriber(ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) {
            return MockSubscriber.create(projectSubscriptionName, messageReceiver, this.supplier, this.acked, this.nacked, this.context.getExecutorService());
        }

        public /* bridge */ /* synthetic */ boolean hasExternalizableOffsets() {
            return super.hasExternalizableOffsets();
        }

        public /* bridge */ /* synthetic */ ObserveHandle observeBulkOffsets(Collection collection, LogObserver logObserver) {
            return super.observeBulkOffsets(collection, logObserver);
        }

        public /* bridge */ /* synthetic */ ObserveHandle observeBulkPartitions(@Nullable String str, Collection collection, Position position, boolean z, LogObserver logObserver) {
            return super.observeBulkPartitions(str, collection, position, z, logObserver);
        }

        public /* bridge */ /* synthetic */ ObserveHandle observeBulk(@Nullable String str, Position position, boolean z, LogObserver logObserver) {
            return super.observeBulk(str, position, z, logObserver);
        }

        public /* bridge */ /* synthetic */ ObserveHandle observePartitions(@Nullable String str, Collection collection, Position position, boolean z, LogObserver logObserver) {
            return super.observePartitions(str, collection, position, z, logObserver);
        }

        public /* bridge */ /* synthetic */ ObserveHandle observe(@Nullable String str, Position position, LogObserver logObserver) {
            return super.observe(str, position, logObserver);
        }

        public /* bridge */ /* synthetic */ List getPartitions() {
            return super.getPartitions();
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReaderTest$TestWatermarkEstimatorFactory.class */
    public static class TestWatermarkEstimatorFactory implements WatermarkEstimatorFactory {
        public WatermarkEstimator create(Map<String, Object> map, WatermarkIdlePolicyFactory watermarkIdlePolicyFactory) {
            UnboundedOutOfOrdernessWatermarkEstimator.Builder withStepMs = UnboundedOutOfOrdernessWatermarkEstimator.newBuilder().withDurationMs(1L).withStepMs(1L);
            AtomicLong atomicLong = PubSubReaderTest.timestampSupplier;
            Objects.requireNonNull(atomicLong);
            return withStepMs.withTimestampSupplier(atomicLong::get).build();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 102230:
                    if (implMethodName.equals("get")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/time/TimestampSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("java/util/concurrent/atomic/AtomicLong") && serializedLambda.getImplMethodSignature().equals("()J")) {
                        AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(0);
                        return atomicLong::get;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public PubSubReaderTest() throws URISyntaxException {
        Assert.assertTrue(this.entity.findAttribute("attr").isPresent());
        this.accessor = new PubSubAccessor(this.storage, this.entity, new URI("gps://my-project/topic"), new HashMap<String, Object>() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.1
            {
                put("watermark.estimator-factory", TestWatermarkEstimatorFactory.class.getName());
            }
        });
    }

    @Before
    public void setUp() {
        this.reader = new TestPubSubReader(this.context);
        timestampSupplier.set(System.currentTimeMillis());
    }

    @Test(timeout = 10000)
    public void testObserve() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedList linkedList = new LinkedList(Arrays.asList(Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis), Util.delete("key2", "attr", currentTimeMillis + 1000), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis)));
        this.reader.setSupplier(() -> {
            if (linkedList.isEmpty()) {
                LockSupport.park();
            }
            return (PubsubMessage) linkedList.pop();
        });
        final ArrayList arrayList = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ObserveHandle observe = this.reader.observe("dummy", new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.2
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                onNextContext.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCancelled() {
                atomicBoolean.set(true);
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        Throwable th = null;
        try {
            try {
                countDownLatch.await();
                if (observe != null) {
                    $closeResource(null, observe);
                }
                Assert.assertEquals(3L, arrayList.size());
                StreamElement streamElement = (StreamElement) arrayList.get(0);
                Assert.assertEquals("key1", streamElement.getKey());
                Assert.assertEquals("attr", streamElement.getAttribute());
                Assert.assertFalse(streamElement.isDelete());
                Assert.assertFalse(streamElement.isDeleteWildcard());
                Assert.assertArrayEquals(new byte[]{1, 2}, streamElement.getValue());
                Assert.assertEquals(currentTimeMillis, streamElement.getStamp());
                StreamElement streamElement2 = (StreamElement) arrayList.get(1);
                Assert.assertEquals("key2", streamElement2.getKey());
                Assert.assertEquals("attr", streamElement2.getAttribute());
                Assert.assertTrue(streamElement2.isDelete());
                Assert.assertFalse(streamElement2.isDeleteWildcard());
                Assert.assertEquals(currentTimeMillis + 1000, streamElement2.getStamp());
                StreamElement streamElement3 = (StreamElement) arrayList.get(2);
                Assert.assertEquals("key3", streamElement3.getKey());
                Assert.assertEquals(this.wildcard.toAttributePrefix() + "*", streamElement3.getAttribute());
                Assert.assertTrue(streamElement3.isDelete());
                Assert.assertTrue(streamElement3.isDeleteWildcard());
                Assert.assertEquals(currentTimeMillis, streamElement3.getStamp());
                Assert.assertTrue(atomicBoolean.get());
                Assert.assertEquals(Sets.newHashSet(new Integer[]{0, 1, 2}), this.reader.acked);
            } finally {
            }
        } catch (Throwable th2) {
            if (observe != null) {
                $closeResource(th, observe);
            }
            throw th2;
        }
    }

    @Test
    public void testObserveWatermark() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedList linkedList = new LinkedList(Arrays.asList(Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis), Util.delete("key2", "attr", currentTimeMillis + 1000), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis)));
        this.reader.setSupplier(() -> {
            if (linkedList.isEmpty()) {
                LockSupport.park();
            }
            return (PubsubMessage) linkedList.pop();
        });
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final AtomicLong atomicLong = new AtomicLong();
        this.reader.observe("dummy", new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.3
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                PubSubReaderTest.timestampSupplier.addAndGet(1000L);
                onNextContext.confirm();
                atomicLong.set(onNextContext.getWatermark());
                countDownLatch.countDown();
                return false;
            }

            public void onCancelled() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        countDownLatch.await();
        Assert.assertTrue(atomicLong.get() > 0);
    }

    @Test
    public void testObserveCommittedOffset() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedList linkedList = new LinkedList(Arrays.asList(Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis), Util.delete("key2", "attr", currentTimeMillis + 1000), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis)));
        this.reader.setSupplier(() -> {
            if (linkedList.isEmpty()) {
                LockSupport.park();
            }
            return (PubsubMessage) linkedList.pop();
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ObserveHandle observe = this.reader.observe("dummy", new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.4
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                PubSubReaderTest.timestampSupplier.addAndGet(1000L);
                onNextContext.confirm();
                countDownLatch.countDown();
                return false;
            }

            public void onCancelled() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1L, observe.getCommittedOffsets().size());
        Assert.assertTrue(((PubSubReader.PubSubOffset) observe.getCommittedOffsets().get(0)).getWatermark() > 0);
    }

    @Test
    public void testObserveBulkOffsetsWithWatermark() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        Offset pubSubOffset = new PubSubReader.PubSubOffset("dummy", currentTimeMillis);
        this.reader.setSupplier(() -> {
            LockSupport.park();
            return null;
        });
        ObserveHandle observeBulkOffsets = this.reader.observeBulkOffsets(Arrays.asList(pubSubOffset), new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.5
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                onNextContext.confirm();
                return false;
            }

            public void onCancelled() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        Throwable th = null;
        try {
            try {
                observeBulkOffsets.waitUntilReady();
                Assert.assertEquals(1L, observeBulkOffsets.getCommittedOffsets().size());
                Assert.assertEquals(currentTimeMillis, ((PubSubReader.PubSubOffset) observeBulkOffsets.getCommittedOffsets().get(0)).getWatermark());
                if (observeBulkOffsets != null) {
                    $closeResource(null, observeBulkOffsets);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (observeBulkOffsets != null) {
                $closeResource(th, observeBulkOffsets);
            }
            throw th3;
        }
    }

    @Test
    public void testPartitionsSplit() throws InterruptedException {
        System.currentTimeMillis();
        List partitions = this.reader.getPartitions();
        Assert.assertEquals(1L, partitions.size());
        List list = (List) ((Partition) partitions.get(0)).split(3).stream().collect(Collectors.toList());
        Assert.assertEquals(3L, list.size());
        this.reader.setSupplier(() -> {
            LockSupport.park();
            return null;
        });
        ObserveHandle observeBulkPartitions = this.reader.observeBulkPartitions(list, Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.6
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                onNextContext.confirm();
                return false;
            }

            public void onCancelled() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        Throwable th = null;
        try {
            try {
                observeBulkPartitions.waitUntilReady();
                if (observeBulkPartitions != null) {
                    $closeResource(null, observeBulkPartitions);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (observeBulkPartitions != null) {
                $closeResource(th, observeBulkPartitions);
            }
            throw th3;
        }
    }

    @Test(timeout = 10000)
    public void testObserveError() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedList linkedList = new LinkedList(Arrays.asList(Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis), Util.delete("key2", "attr", currentTimeMillis + 1000), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis)));
        this.reader.setSupplier(() -> {
            if (linkedList.isEmpty()) {
                LockSupport.park();
            }
            return (PubsubMessage) linkedList.pop();
        });
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ObserveHandle observe = this.reader.observe("dummy", new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.7
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                throw new RuntimeException("Fail");
            }

            public void onCancelled() {
                atomicBoolean.set(true);
            }

            public boolean onError(Throwable th) {
                Assert.assertEquals("Fail", th.getCause().getMessage());
                countDownLatch.countDown();
                return true;
            }
        });
        try {
            countDownLatch.await();
            Assert.assertTrue(this.reader.acked.isEmpty());
            Assert.assertFalse(this.reader.nacked.isEmpty());
            if (observe != null) {
                $closeResource(null, observe);
            }
        } catch (Throwable th) {
            if (observe != null) {
                $closeResource(null, observe);
            }
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testObserveBulk() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        LinkedList linkedList = new LinkedList(Arrays.asList(Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis), Util.delete("key2", "attr", currentTimeMillis + 1000), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis)));
        this.reader.setSupplier(() -> {
            if (linkedList.isEmpty()) {
                LockSupport.park();
            }
            return (PubsubMessage) linkedList.pop();
        });
        final ArrayList arrayList = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final AtomicReference atomicReference = new AtomicReference();
        ObserveHandle observeBulk = this.reader.observeBulk("dummy", new LogObserver() { // from class: cz.o2.proxima.direct.pubsub.PubSubReaderTest.8
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                atomicReference.set(onNextContext);
                countDownLatch.countDown();
                return true;
            }

            public void onCancelled() {
                atomicBoolean.set(true);
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        Throwable th = null;
        try {
            try {
                countDownLatch.await();
                ((LogObserver.OffsetCommitter) atomicReference.get()).confirm();
                if (observeBulk != null) {
                    $closeResource(null, observeBulk);
                }
                Assert.assertEquals(3L, arrayList.size());
                StreamElement streamElement = (StreamElement) arrayList.get(0);
                Assert.assertEquals("key1", streamElement.getKey());
                Assert.assertEquals("attr", streamElement.getAttribute());
                Assert.assertFalse(streamElement.isDelete());
                Assert.assertFalse(streamElement.isDeleteWildcard());
                Assert.assertArrayEquals(new byte[]{1, 2}, streamElement.getValue());
                Assert.assertEquals(currentTimeMillis, streamElement.getStamp());
                StreamElement streamElement2 = (StreamElement) arrayList.get(1);
                Assert.assertEquals("key2", streamElement2.getKey());
                Assert.assertEquals("attr", streamElement2.getAttribute());
                Assert.assertTrue(streamElement2.isDelete());
                Assert.assertFalse(streamElement2.isDeleteWildcard());
                Assert.assertEquals(currentTimeMillis + 1000, streamElement2.getStamp());
                StreamElement streamElement3 = (StreamElement) arrayList.get(2);
                Assert.assertEquals("key3", streamElement3.getKey());
                Assert.assertEquals(this.wildcard.toAttributePrefix() + "*", streamElement3.getAttribute());
                Assert.assertTrue(streamElement3.isDelete());
                Assert.assertTrue(streamElement3.isDeleteWildcard());
                Assert.assertEquals(currentTimeMillis, streamElement3.getStamp());
                Assert.assertTrue(atomicBoolean.get());
                Assert.assertEquals(Sets.newHashSet(new Integer[]{0, 1, 2}), this.reader.acked);
            } finally {
            }
        } catch (Throwable th2) {
            if (observeBulk != null) {
                $closeResource(th, observeBulk);
            }
            throw th2;
        }
    }

    @Test
    public void testInstantiationHttp2Error() {
        Assert.assertNotNull(GrpcUtil.Http2Error.NO_ERROR);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 564481480:
                if (implMethodName.equals("newCachedThreadPool")) {
                    z = true;
                    break;
                }
                break;
            case 2097764861:
                if (implMethodName.equals("lambda$new$60305140$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubReaderTest") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/core/DirectDataOperator;)V")) {
                    return directDataOperator -> {
                        directDataOperator.withExecutorFactory(Executors::newCachedThreadPool);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/concurrent/Executors") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return Executors::newCachedThreadPool;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
