package cz.o2.proxima.direct.storage;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/storage/InMemStorageTest.class */
public class InMemStorageTest implements Serializable {
    final Repository repo = ConfigRepository.of(ConfigFactory.load("test-reference.conf").resolve());
    final DirectDataOperator direct = this.repo.asDataOperator(DirectDataOperator.class, new Consumer[0]);
    final EntityDescriptor entity = (EntityDescriptor) this.repo.findEntity("dummy").orElseThrow(() -> {
        return new IllegalStateException("Missing entity dummy");
    });
    final AttributeDescriptor<?> data = (AttributeDescriptor) this.entity.findAttribute("data").orElseThrow(() -> {
        return new IllegalStateException("Missing attribute data");
    });

    @Test(timeout = 10000)
    public void testObservePartitions() throws URISyntaxException, InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, this.entity, new URI("inmem:///inmemstoragetest"), Collections.emptyMap());
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final AtomicReference atomicReference = new AtomicReference();
        commitLogReader.observePartitions(commitLogReader.getPartitions(), new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.1
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
                atomicReference.set(new CountDownLatch(1));
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                ((CountDownLatch) atomicReference.get()).countDown();
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        ((CountDownLatch) atomicReference.get()).await();
    }

    @Test(timeout = 10000)
    public void testObserveBatch() throws URISyntaxException, InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, this.entity, new URI("inmem:///inmemstoragetest"), Collections.emptyMap());
        BatchLogObservable batchLogObservable = (BatchLogObservable) createAccessor.getBatchLogObservable(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing batch log observable");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        batchLogObservable.observe(batchLogObservable.getPartitions(), Arrays.asList(this.data), new BatchLogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.2
            public boolean onNext(StreamElement streamElement, Partition partition) {
                Assert.assertEquals(0L, partition.getId());
                Assert.assertEquals("key", streamElement.getKey());
                countDownLatch.countDown();
                return false;
            }

            public boolean onError(Throwable th2) {
                throw new RuntimeException(th2);
            }
        });
        countDownLatch.await();
    }

    @Test(timeout = 10000)
    public void testObserveCancel() throws URISyntaxException, InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, this.entity, new URI("inmem:///inmemstoragetest"), Collections.emptyMap());
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final ArrayList arrayList = new ArrayList();
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.3
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        Assert.assertEquals(1L, observePartitions.getCurrentOffsets().size());
        Assert.assertEquals(Arrays.asList((byte) 1), arrayList);
        observePartitions.close();
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Arrays.asList((byte) 1), arrayList);
    }

    @Test(timeout = 10000)
    public void testObserveOffsets() throws URISyntaxException, InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, this.entity, new URI("inmem:///inmemstoragetest"), Collections.emptyMap());
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) createAccessor.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer");
        });
        final ArrayList arrayList = new ArrayList();
        LogObserver logObserver = new LogObserver() { // from class: cz.o2.proxima.direct.storage.InMemStorageTest.4
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), logObserver);
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        List currentOffsets = observePartitions.getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets.size());
        Assert.assertTrue(((Offset) currentOffsets.get(0)).getWatermark() > 0);
        Assert.assertEquals(Arrays.asList((byte) 1), arrayList);
        observePartitions.close();
        List currentOffsets2 = commitLogReader.observeBulkOffsets(currentOffsets, logObserver).getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets2.size());
        Assert.assertTrue(((Offset) currentOffsets2.get(0)).getWatermark() > 0);
        attributeWriterBase.online().write(StreamElement.update(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Arrays.asList((byte) 1, (byte) 2), arrayList);
    }
}
