package cz.o2.proxima.direct.storage;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/storage/ListBatchReaderTest.class */
public class ListBatchReaderTest {
    private final long now = System.currentTimeMillis();
    private final Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve(), new Repository.Validate[0]);
    private final Context context = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]).getContext();
    private final EntityDescriptor gateway = this.repo.getEntity("gateway");
    private final AttributeDescriptor<byte[]> status = this.gateway.getAttribute("status");
    private final AttributeDescriptor<?> armed = this.gateway.getAttribute("armed");

    @Test(timeout = 10000)
    public void testReadingFromSinglePartition() throws InterruptedException {
        ListBatchReader of = ListBatchReader.of(this.context, getData(100));
        Assert.assertEquals(1L, of.getPartitions().size());
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch);
        of.observe(of.getPartitions(), Collections.singletonList(this.status), toList(arrayList, countDownLatch::countDown));
        countDownLatch.await();
        Assert.assertEquals(100, arrayList.stream().map((v0) -> {
            return v0.getKey();
        }).distinct().count());
        arrayList.clear();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch2);
        of.observe(of.getPartitions(), Collections.singletonList(this.armed), toList(arrayList, countDownLatch2::countDown));
        countDownLatch2.await();
        Assert.assertTrue(arrayList.isEmpty());
    }

    @Test(timeout = 10000)
    public void testReadingFromTwoPartitions() throws InterruptedException {
        ListBatchReader ofPartitioned = ListBatchReader.ofPartitioned(this.context, (List<StreamElement>[]) new List[]{getData(100), getData(2 * 100)});
        Assert.assertEquals(2L, ofPartitioned.getPartitions().size());
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch);
        ofPartitioned.observe(ofPartitioned.getPartitions().subList(0, 1), Collections.singletonList(this.status), toList(arrayList, countDownLatch::countDown));
        countDownLatch.await();
        Assert.assertEquals(100, arrayList.stream().map((v0) -> {
            return v0.getKey();
        }).distinct().count());
        arrayList.clear();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch2);
        ofPartitioned.observe(ofPartitioned.getPartitions().subList(1, 2), Collections.singletonList(this.status), toList(arrayList, countDownLatch2::countDown));
        countDownLatch2.await();
        Assert.assertEquals(2 * 100, arrayList.stream().map((v0) -> {
            return v0.getKey();
        }).distinct().count());
        arrayList.clear();
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Objects.requireNonNull(countDownLatch3);
        ofPartitioned.observe(ofPartitioned.getPartitions(), Collections.singletonList(this.status), toList(arrayList, countDownLatch3::countDown));
        countDownLatch3.await();
        Assert.assertEquals(3 * 100, arrayList.stream().map((v0) -> {
            return v0.getUuid();
        }).distinct().count());
    }

    private BatchLogObserver toList(final List<StreamElement> list, final Runnable runnable) {
        return new BatchLogObserver() { // from class: cz.o2.proxima.direct.storage.ListBatchReaderTest.1
            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertNotNull(onNextContext);
                list.add(streamElement);
                return true;
            }

            public void onCompleted() {
                runnable.run();
            }
        };
    }

    private List<StreamElement> getData(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return StreamElement.upsert(this.gateway, this.status, UUID.randomUUID().toString(), "key" + i2, this.status.getName(), this.now, new byte[0]);
        }).collect(Collectors.toList());
    }
}
