package cz.o2.proxima.direct.batch;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReaderTest;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.storage.ListBatchReader;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Streams;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.ThroughputLimiter;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.ReplicationRunner;
import cz.o2.proxima.util.SerializableLong;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/batch/BatchLogReaderTest.class */
public class BatchLogReaderTest {
    private Repository repo;
    private DirectDataOperator direct;
    private EntityDescriptor entity;
    private AttributeDescriptor<byte[]> attr;
    private long now;

    @Before
    public void setUp() {
        this.repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve(), new Repository.Validate[0]);
        this.direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
        ReplicationRunner.runAttributeReplicas(this.direct);
        this.entity = this.repo.getEntity("gateway");
        this.attr = this.entity.getAttribute("armed");
        this.now = System.currentTimeMillis();
    }

    @Test
    public void testSimpleObserve() throws InterruptedException {
        write("gw", new byte[]{1});
        BatchLogReader batchReader = getBatchReader();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        batchReader.observe(batchReader.getPartitions(), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.1
            public boolean onNext(StreamElement streamElement) {
                BlockingQueue blockingQueue = synchronousQueue;
                ExceptionUtils.unchecked(() -> {
                    blockingQueue.put(streamElement);
                });
                return false;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 84535819:
                        if (implMethodName.equals("lambda$onNext$756adf97$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/batch/BatchLogReaderTest$1") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/BlockingQueue;Lcz/o2/proxima/storage/StreamElement;)V")) {
                            BlockingQueue blockingQueue = (BlockingQueue) serializedLambda.getCapturedArg(0);
                            StreamElement streamElement = (StreamElement) serializedLambda.getCapturedArg(1);
                            return () -> {
                                blockingQueue.put(streamElement);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
        Assert.assertEquals("gw", ((StreamElement) synchronousQueue.take()).getKey());
    }

    @After
    public void tearDown() {
        this.repo.drop();
    }

    @Test(timeout = 5000)
    public void testObserveWithContext() throws InterruptedException {
        write("gw", new byte[]{1});
        BatchLogReader batchReader = getBatchReader();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        batchReader.observe(batchReader.getPartitions(), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.2
            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals(Long.MIN_VALUE, onNextContext.getWatermark());
                BlockingQueue blockingQueue = synchronousQueue;
                ExceptionUtils.unchecked(() -> {
                    blockingQueue.put(streamElement);
                });
                return false;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1831967197:
                        if (implMethodName.equals("lambda$onNext$1af35883$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/batch/BatchLogReaderTest$2") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/BlockingQueue;Lcz/o2/proxima/storage/StreamElement;)V")) {
                            BlockingQueue blockingQueue = (BlockingQueue) serializedLambda.getCapturedArg(0);
                            StreamElement streamElement = (StreamElement) serializedLambda.getCapturedArg(1);
                            return () -> {
                                blockingQueue.put(streamElement);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
        Assert.assertEquals("gw", ((StreamElement) synchronousQueue.take()).getKey());
    }

    @Test(timeout = 5000)
    public void testObserveWithoutThroughputLimit() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            write("gw" + i, new byte[]{(byte) i});
        }
        BatchLogReader batchReader = getBatchReader();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        Assert.assertTrue(batchReader.observe(batchReader.getPartitions(), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.3
            public boolean onNext(StreamElement streamElement) {
                BlockingQueue blockingQueue = synchronousQueue;
                ExceptionUtils.unchecked(() -> {
                    blockingQueue.put(streamElement);
                });
                return true;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 84535819:
                        if (implMethodName.equals("lambda$onNext$756adf97$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/batch/BatchLogReaderTest$3") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/BlockingQueue;Lcz/o2/proxima/storage/StreamElement;)V")) {
                            BlockingQueue blockingQueue = (BlockingQueue) serializedLambda.getCapturedArg(0);
                            StreamElement streamElement = (StreamElement) serializedLambda.getCapturedArg(1);
                            return () -> {
                                blockingQueue.put(streamElement);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }).isReadyForProcessing());
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            StreamElement streamElement = (StreamElement) synchronousQueue.take();
            Assert.assertNotNull(streamElement.getValue());
            Assert.assertEquals(1L, streamElement.getValue().length);
            arrayList.add(Integer.valueOf(streamElement.getValue()[0]));
        }
        Assert.assertEquals(IntStream.range(0, 100).boxed().collect(Collectors.toList()), arrayList.stream().sorted().collect(Collectors.toList()));
        Assert.assertTrue(System.currentTimeMillis() - this.now < 1000);
    }

    @Test(timeout = 5000)
    public void testObserveWithThroughputLimit() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            write("gw" + i, new byte[]{(byte) i});
        }
        BatchLogReader withLimitedThroughput = BatchLogReaders.withLimitedThroughput(getBatchReader(), CommitLogReaderTest.withNumRecordsPerSec(100 / 2));
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        withLimitedThroughput.observe(withLimitedThroughput.getPartitions(), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.4
            public boolean onNext(StreamElement streamElement) {
                BlockingQueue blockingQueue = synchronousQueue;
                ExceptionUtils.unchecked(() -> {
                    blockingQueue.put(streamElement);
                });
                return true;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 84535819:
                        if (implMethodName.equals("lambda$onNext$756adf97$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/batch/BatchLogReaderTest$4") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/BlockingQueue;Lcz/o2/proxima/storage/StreamElement;)V")) {
                            BlockingQueue blockingQueue = (BlockingQueue) serializedLambda.getCapturedArg(0);
                            StreamElement streamElement = (StreamElement) serializedLambda.getCapturedArg(1);
                            return () -> {
                                blockingQueue.put(streamElement);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            StreamElement streamElement = (StreamElement) synchronousQueue.take();
            Assert.assertNotNull(streamElement.getValue());
            Assert.assertEquals(1L, streamElement.getValue().length);
            arrayList.add(Integer.valueOf(streamElement.getValue()[0]));
        }
        Assert.assertEquals(IntStream.range(0, 100).boxed().collect(Collectors.toList()), arrayList.stream().sorted().collect(Collectors.toList()));
        Assert.assertTrue(System.currentTimeMillis() - this.now > 1000);
    }

    @Test(timeout = 5000)
    public void testObserveWithWatermarkLimit() {
        for (int i = 0; i < 100; i++) {
            write("gw" + i, new byte[]{(byte) i});
        }
        SerializableLong serializableLong = new SerializableLong(Long.MIN_VALUE);
        BatchLogReader withLimitedThroughput = BatchLogReaders.withLimitedThroughput(getBatchReader(), getWatermarkLimiter(serializableLong));
        ObserveHandle observe = withLimitedThroughput.observe(withLimitedThroughput.getPartitions(), Collections.singletonList(this.attr), getDummyObserver());
        try {
            Assert.assertFalse(observe.isReadyForProcessing());
            serializableLong.set(Long.MAX_VALUE);
            Assert.assertTrue(observe.isReadyForProcessing());
            if (observe != null) {
                observe.close();
            }
        } catch (Throwable th) {
            if (observe != null) {
                try {
                    observe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testObserveWithWatermarkLimitDisabled() {
        for (int i = 0; i < 100; i++) {
            write("gw" + i, new byte[]{(byte) i});
        }
        BatchLogReader withLimitedThroughput = BatchLogReaders.withLimitedThroughput(getBatchReader(), getWatermarkLimiter(new SerializableLong(Long.MIN_VALUE)));
        ObserveHandle observe = withLimitedThroughput.observe(withLimitedThroughput.getPartitions(), Collections.singletonList(this.attr), getDummyObserver());
        try {
            observe.disableRateLimiting();
            Assert.assertTrue(observe.isReadyForProcessing());
            if (observe != null) {
                observe.close();
            }
        } catch (Throwable th) {
            if (observe != null) {
                try {
                    observe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static ThroughputLimiter getWatermarkLimiter(final SerializableLong serializableLong) {
        return new ThroughputLimiter() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.5
            public Duration getPauseTime(ThroughputLimiter.Context context) {
                return context.getMinWatermark() < serializableLong.get() ? Duration.ZERO : Duration.ofSeconds(1L);
            }

            public void close() {
            }
        };
    }

    private static BatchLogObserver getDummyObserver() {
        return new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.6
            public boolean onNext(StreamElement streamElement) {
                return true;
            }
        };
    }

    @Test
    public void testObserveReadOffset() throws InterruptedException {
        ListBatchReader ofPartitioned = ListBatchReader.ofPartitioned(this.direct.getContext(), (List<List<StreamElement>>) Arrays.asList(newPartition("first_", 10), newPartition("second_", 20), newPartition("third_", 30)));
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ofPartitioned.observe(Arrays.asList(Partition.of(0), Partition.of(1), Partition.of(2)), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.7
            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                concurrentHashMap.merge(onNextContext.getPartition(), onNextContext.getOffset(), (offset, offset2) -> {
                    Assert.assertTrue(offset.getElementIndex() < offset2.getElementIndex());
                    return offset2;
                });
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertEquals(Offset.of(Partition.of(0), 9L, true), concurrentHashMap.get(Partition.of(0)));
        Assert.assertEquals(Offset.of(Partition.of(1), 19L, true), concurrentHashMap.get(Partition.of(1)));
        Assert.assertEquals(Offset.of(Partition.of(2), 29L, true), concurrentHashMap.get(Partition.of(2)));
    }

    @Test
    public void testObserveOffsets() throws InterruptedException {
        List<StreamElement> newPartition = newPartition("first_", 100);
        List<StreamElement> newPartition2 = newPartition("second_", 80);
        List<StreamElement> newPartition3 = newPartition("third_", 60);
        ListBatchReader ofPartitioned = ListBatchReader.ofPartitioned(this.direct.getContext(), (List<List<StreamElement>>) Arrays.asList(newPartition, newPartition2, newPartition3));
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ofPartitioned.observeOffsets(Arrays.asList(Offset.of(Partition.of(0), 50L, false), Offset.of(Partition.of(2), 40L, false)), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.batch.BatchLogReaderTest.8
            public boolean onNext(StreamElement streamElement) {
                Assert.assertTrue(linkedBlockingQueue.add(streamElement.getKey()));
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertEquals((Set) Streams.concat(new Stream[]{newPartition.subList(50, newPartition.size()).stream(), newPartition3.subList(40, newPartition3.size()).stream()}).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet()), new HashSet(linkedBlockingQueue));
    }

    private BatchLogReader getBatchReader() {
        return (BatchLogReader) Optionals.get(this.direct.getBatchLogReader(new AttributeDescriptor[]{this.attr}));
    }

    private List<StreamElement> newPartition(String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(newData(str + i2, new byte[]{(byte) i2}));
        }
        return Collections.unmodifiableList(arrayList);
    }

    private StreamElement newData(String str, byte[] bArr) {
        return StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), str, this.attr.getName(), this.now, bArr);
    }

    private void write(String str, byte[] bArr) {
        ((OnlineAttributeWriter) Optionals.get(this.direct.getWriter(this.attr))).write(newData(str, bArr), CommitCallback.noop());
    }
}
