package cz.o2.proxima.direct.commitlog;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Optionals;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/commitlog/CommitLogReaderTest.class */
public class CommitLogReaderTest {
    private final transient Repository repo = Repository.of(ConfigFactory.load().withFallback(ConfigFactory.load("test-reference.conf")).resolve());
    private final transient EntityDescriptor entity = (EntityDescriptor) Optionals.get(this.repo.findEntity("event"));
    private final transient AttributeDescriptor<?> attr = (AttributeDescriptor) Optionals.get(this.entity.findAttribute("data"));
    private transient CommitLogReader reader;
    private transient AttributeWriterBase writer;

    @Before
    public void setUp() {
        Optional findAny = this.repo.getAllFamilies().filter(attributeFamilyDescriptor -> {
            return attributeFamilyDescriptor.getName().equals("event-storage-stream");
        }).findAny();
        DirectDataOperator asDataOperator = this.repo.asDataOperator(DirectDataOperator.class, new Consumer[0]);
        Objects.requireNonNull(asDataOperator);
        DirectAttributeFamilyDescriptor directAttributeFamilyDescriptor = (DirectAttributeFamilyDescriptor) findAny.map(asDataOperator::resolveRequired).get();
        this.reader = (CommitLogReader) directAttributeFamilyDescriptor.getCommitLogReader().get();
        this.writer = (AttributeWriterBase) directAttributeFamilyDescriptor.getWriter().get();
    }

    @Test(timeout = 10000)
    public void testObserveSimple() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.reader.observe("test", new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.1
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                onNextContext.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testObserveWithError() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        this.reader.observe("test", new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.2
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                throw new RuntimeException("fail");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
                return false;
            }
        });
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertNotNull(atomicReference.get());
    }

    @Test(timeout = 10000)
    public void testObserveWithRetry() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        RetryableLogObserver.online(2, "test", this.reader, new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.3
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                if (atomicInteger.incrementAndGet() == 0) {
                    throw new RuntimeException("fail");
                }
                arrayList.add(streamElement);
                countDownLatch.countDown();
                return true;
            }

            public boolean onError(Throwable th) {
                return false;
            }
        }).start();
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testBulkObserve() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        this.reader.observeBulk("test", new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.4
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                if (arrayList.size() != 2) {
                    return true;
                }
                onNextContext.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        for (int i = 0; i < 2; i++) {
            this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
            });
        }
        countDownLatch.await();
        Assert.assertEquals(2L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testObserveOrdered() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        this.reader.observe("test", LogObservers.withSortBuffer(new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.5
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                onNextContext.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }, Duration.ofMillis(500L)));
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), (currentTimeMillis + 99) - i, new byte[]{1, 2}), (z, th) -> {
            });
        }
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), (currentTimeMillis + 99) - 10000, new byte[]{1, 2}), (z2, th2) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(100L, arrayList.size());
        Assert.assertEquals(LongStream.range(0L, 100L).mapToObj(Long::valueOf).collect(Collectors.toList()), arrayList.stream().map(streamElement -> {
            return Long.valueOf(streamElement.getStamp() - currentTimeMillis);
        }).collect(Collectors.toList()));
    }

    @Test(timeout = 10000)
    public void testObserveOrderedPerPartition() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        this.reader.observe("test", LogObservers.withSortBufferWithinPartition(new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.6
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                onNextContext.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }, Duration.ofMillis(500L)));
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), (currentTimeMillis + 99) - i, new byte[]{1, 2}), (z, th) -> {
            });
        }
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), (currentTimeMillis + 99) - 10000, new byte[]{1, 2}), (z2, th2) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(100L, arrayList.size());
        Assert.assertEquals(LongStream.range(0L, 100L).mapToObj(Long::valueOf).collect(Collectors.toList()), arrayList.stream().map(streamElement -> {
            return Long.valueOf(streamElement.getStamp() - currentTimeMillis);
        }).collect(Collectors.toList()));
    }

    @Test
    public void testOrderedObserverLifycycle() {
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), 0L, new byte[]{1, 2});
        final AtomicInteger atomicInteger = new AtomicInteger();
        LogObserver logObserver = new LogObserver() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.7
            public void onCompleted() {
                atomicInteger.updateAndGet(i -> {
                    return i | 1;
                });
            }

            public void onCancelled() {
                atomicInteger.updateAndGet(i -> {
                    return i | 2;
                });
            }

            public boolean onError(Throwable th) {
                atomicInteger.updateAndGet(i -> {
                    return i | 4;
                });
                return true;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.updateAndGet(i -> {
                    return i | 8;
                });
                return true;
            }

            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                atomicInteger.updateAndGet(i -> {
                    return i | 16;
                });
            }

            public void onIdle(LogObserver.OnIdleContext onIdleContext) {
                atomicInteger.updateAndGet(i -> {
                    return i | 32;
                });
            }
        };
        Assert.assertEquals(0L, atomicInteger.get());
        logObserver.onCompleted();
        Assert.assertEquals(1L, atomicInteger.get());
        logObserver.onCancelled();
        Assert.assertEquals(3L, atomicInteger.get());
        logObserver.onError((Throwable) null);
        Assert.assertEquals(7L, atomicInteger.get());
        logObserver.onNext(update, asOnNextContext(0L));
        Assert.assertEquals(15L, atomicInteger.get());
        logObserver.onRepartition(() -> {
            return Collections.emptyList();
        });
        Assert.assertEquals(31L, atomicInteger.get());
        logObserver.onIdle(() -> {
            return 0L;
        });
        Assert.assertEquals(63L, atomicInteger.get());
    }

    private LogObserver.OnNextContext asOnNextContext(final long j) {
        return new LogObserver.OnNextContext() { // from class: cz.o2.proxima.direct.commitlog.CommitLogReaderTest.8
            public LogObserver.OffsetCommitter committer() {
                return null;
            }

            public Partition getPartition() {
                return null;
            }

            public long getWatermark() {
                return j;
            }

            public Offset getOffset() {
                return null;
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1194331687:
                if (implMethodName.equals("lambda$testOrderedObserverLifycycle$bc14e5bf$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/LogObserver$OnIdleContext") && serializedLambda.getFunctionalInterfaceMethodName().equals("getWatermark") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/CommitLogReaderTest") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return () -> {
                        return 0L;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
