package cz.o2.proxima.direct.storage;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.storage.InMemStorage;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.AccessType;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.util.Optionals;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/storage/InMemStorageTest.class */
public class InMemStorageTest implements Serializable {
    final Repository repo = Repository.of(ConfigFactory.load("test-reference.conf").resolve());
    final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    final EntityDescriptor entity = this.repo.getEntity("dummy");
    final AttributeDescriptor<?> data = this.entity.getAttribute("data");

    @Test(timeout = 10000)
    public void testObservePartitions() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final AtomicReference atomicReference = new AtomicReference();
        commitLogReader.observePartitions(commitLogReader.getPartitions(), new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.1
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
                atomicReference.set(new CountDownLatch(1));
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                ((CountDownLatch) atomicReference.get()).countDown();
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        ((CountDownLatch) atomicReference.get()).await();
    }

    @Test(timeout = 10000)
    public void testObservePartionsWithSamePath() throws InterruptedException {
        InMemStorage inMemStorage = new InMemStorage();
        DataAccessor createAccessor = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test1")));
        DataAccessor createAccessor2 = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test2")));
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing batch log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3});
        attributeWriterBase.online().write(upsert, (z, th) -> {
        });
        ((AttributeWriterBase) createAccessor2.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer2");
        })).online().write(upsert, (z2, th2) -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        commitLogReader.observePartitions(commitLogReader.getPartitions(), Position.OLDEST, true, new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.2
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                atomicInteger.incrementAndGet();
                return true;
            }

            public boolean onError(Throwable th3) {
                throw new RuntimeException(th3);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testObserveBatch() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        BatchLogReader batchLogReader = (BatchLogReader) createAccessor.getBatchLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing batch log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        batchLogReader.observe(batchLogReader.getPartitions(), Collections.singletonList(this.data), new BatchLogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.3
            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                countDownLatch.countDown();
                return false;
            }

            public boolean onError(Throwable th2) {
                throw new RuntimeException(th2);
            }
        });
        countDownLatch.await();
    }

    @Test(timeout = 10000)
    public void testObserveBatchWithSamePath() throws InterruptedException {
        InMemStorage inMemStorage = new InMemStorage();
        DataAccessor createAccessor = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test1")));
        DataAccessor createAccessor2 = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test2")));
        BatchLogReader batchLogReader = (BatchLogReader) createAccessor.getBatchLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing batch log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3});
        attributeWriterBase.online().write(upsert, (z, th) -> {
        });
        ((AttributeWriterBase) createAccessor2.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer2");
        })).online().write(upsert, (z2, th2) -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        batchLogReader.observe(batchLogReader.getPartitions(), Arrays.asList(this.data), new BatchLogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.4
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                atomicInteger.incrementAndGet();
                return true;
            }

            public boolean onError(Throwable th3) {
                throw new RuntimeException(th3);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testObserveCancel() {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final ArrayList arrayList = new ArrayList();
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.5
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        Assert.assertEquals(1L, observePartitions.getCurrentOffsets().size());
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
        observePartitions.close();
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
    }

    @Test(timeout = 10000)
    public void testObserveOffsets() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final ArrayList arrayList = new ArrayList();
        LogObserver logObserver = new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.6
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), logObserver);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        List currentOffsets = observePartitions.getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets.size());
        Assert.assertTrue(((Offset) currentOffsets.get(0)).getWatermark() > 0);
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
        observePartitions.close();
        ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(currentOffsets, logObserver);
        observeBulkOffsets.waitUntilReady();
        List currentOffsets2 = observeBulkOffsets.getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets2.size());
        Assert.assertTrue("Expected positive watermark, got " + ((Offset) currentOffsets2.get(0)).getWatermark(), ((Offset) currentOffsets2.get(0)).getWatermark() > 0);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Arrays.asList((byte) 1, (byte) 1), arrayList);
        Assert.assertEquals(0L, ((InMemStorage.ConsumedOffset) observeBulkOffsets.getCurrentOffsets().get(0)).getConsumedKeyAttr().size());
        observeBulkOffsets.close();
    }

    @Test
    public void testObserveWithEndOfTime() throws InterruptedException {
        URI create = URI.create("inmem:///inmemstoragetest");
        InMemStorage inMemStorage = new InMemStorage();
        InMemStorage.setWatermarkEstimatorFactory(create, (j, str, consumedOffset) -> {
            return new WatermarkEstimator() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.7
                {
                    Preconditions.checkArgument(consumedOffset != null);
                }

                public long getWatermark() {
                    return Long.MAX_VALUE - InMemStorage.getBoundedOutOfOrderness();
                }

                public void update(StreamElement streamElement) {
                }

                public void setMinWatermark(long j) {
                }
            };
        });
        CommitLogReader commitLogReader = (CommitLogReader) inMemStorage.createAccessor(this.direct, createFamilyDescriptor(create)).getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        commitLogReader.observe("observer", new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.8
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onError(Throwable th) {
                return false;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                return false;
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test(timeout = 1000)
    public void testObserveError() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        commitLogReader.observe("failing-observer", new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.9
            public void onCompleted() {
                throw new UnsupportedOperationException("This should never happen.");
            }

            public boolean onError(Throwable th) {
                countDownLatch.countDown();
                return false;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("Test exception.");
            }
        });
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        commitLogReader.observe("success-observer", new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.10
            public void onCompleted() {
                throw new UnsupportedOperationException("This should never happen.");
            }

            public boolean onError(Throwable th) {
                return false;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                countDownLatch2.countDown();
                return true;
            }
        });
        for (int i = 0; i < 100; i++) {
            attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key_" + i, this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z, th) -> {
            });
        }
        countDownLatch.await();
        countDownLatch2.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testObserveMultiplePartitions() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.11
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(3L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                concurrentHashMap.merge(onNextContext.getPartition(), 1L, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                onNextContext.confirm();
                countDownLatch.countDown();
                return countDownLatch.getCount() > 0;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        for (int i = 0; i < 1000; i++) {
            attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key_" + i, this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), CommitCallback.noop());
        }
        countDownLatch.await();
        Assert.assertEquals(3L, concurrentHashMap.size());
        Assert.assertEquals(3L, observePartitions.getCurrentOffsets().size());
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testRandomAccessReaderWithMultiplePartitions() {
        new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getRandomAccessReader(this.direct.getContext());
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testBatchLogReaderWithMultiplePartitions() {
        new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getBatchLogReader(this.direct.getContext());
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testCachedViewWithMultiplePartitions() {
        new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getCachedView(this.direct.getContext());
    }

    private AttributeFamilyDescriptor createFamilyDescriptor(URI uri) {
        return createFamilyDescriptor(uri, 1);
    }

    private AttributeFamilyDescriptor createFamilyDescriptor(URI uri, int i) {
        HashMap hashMap = new HashMap();
        if (i > 1) {
            hashMap.put("num-partitions", 3);
        }
        return AttributeFamilyDescriptor.newBuilder().setName("test").setEntity(this.entity).setType(StorageType.PRIMARY).setAccess(AccessType.from("commit-log")).setStorageUri(uri).setCfg(hashMap).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 852733457:
                if (implMethodName.equals("lambda$testObserveWithEndOfTime$b5af63a8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/storage/InMemStorage$WatermarkEstimatorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JLjava/lang/String;Lcz/o2/proxima/direct/storage/InMemStorage$ConsumedOffset;)Lcz/o2/proxima/time/WatermarkEstimator;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/String;Lcz/o2/proxima/direct/storage/InMemStorage$ConsumedOffset;)Lcz/o2/proxima/time/WatermarkEstimator;")) {
                    InMemStorageTest inMemStorageTest = (InMemStorageTest) serializedLambda.getCapturedArg(0);
                    return (j, str, consumedOffset) -> {
                        return new WatermarkEstimator() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.7
                            {
                                Preconditions.checkArgument(consumedOffset != null);
                            }

                            public long getWatermark() {
                                return Long.MAX_VALUE - InMemStorage.getBoundedOutOfOrderness();
                            }

                            public void update(StreamElement streamElement) {
                            }

                            public void setMinWatermark(long j) {
                            }
                        };
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
