package cz.o2.proxima.storage.kafka;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.kafka.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorBase;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.RetryableBulkObserver;
import cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptor;
import cz.o2.proxima.storage.kafka.partitioner.FirstPartitionPartitioner;
import cz.o2.proxima.view.PartitionedLogObserver;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.executor.local.LocalExecutor;
import cz.seznam.euphoria.shadow.com.google.common.collect.Iterators;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.class */
public class LocalKafkaCommitLogDescriptorTest implements Serializable {
    final transient Repository repo = Repository.Builder.ofTest(ConfigFactory.empty()).build();
    final AttributeDescriptorBase<?> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("attr").setSchemeURI(new URI("bytes:///")).build();
    final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).build();
    final URI storageURI = new URI("kafka-test://dummy/topic");
    LocalKafkaCommitLogDescriptor kafka;

    @Before
    public void setUp() {
        this.kafka = new LocalKafkaCommitLogDescriptor();
    }

    @Test(timeout = 2000)
    public void testSinglePartitionWriteAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = accessor.createConsumerFactory().create().poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Assert.assertEquals("key#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 2000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = accessor.createConsumerFactory().create().poll(1000L);
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testEmptyPoll() {
        Assert.assertTrue(this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2)).createConsumerFactory().create().poll(100L).isEmpty());
    }

    @Test(timeout = 2000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunBeforeWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 4000)
    public void testTwoPartitionsTwoWritesAndTwoReads() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll2 = create.poll(1000L);
        Assert.assertEquals(1L, poll2.count());
        Assert.assertEquals(1L, poll2.partitions().size());
    }

    @Test(timeout = 4000)
    public void testTwoIdependentConsumers() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        for (KafkaConsumer kafkaConsumer : new KafkaConsumer[]{accessor.createConsumerFactory().create("dumm1"), accessor.createConsumerFactory().create("dumm2")}) {
            ConsumerRecords poll = kafkaConsumer.poll(1000L);
            Assert.assertEquals(1L, poll.count());
            Assert.assertEquals(1L, poll.partitions().size());
            TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
            Assert.assertEquals(0L, topicPartition.partition());
            Assert.assertEquals("topic", topicPartition.topic());
            int i = 0;
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                Assert.assertEquals("key#attr", consumerRecord.key());
                Assert.assertEquals("topic", consumerRecord.topic());
                Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
                i++;
            }
            Assert.assertEquals(1L, i);
        }
    }

    @Test(timeout = 2000)
    public void testManualPartitionAssignment() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = accessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        })).poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(1L, i);
    }

    @Test
    public void testTwoPartitionsTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create("consumer");
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        Assert.assertEquals(2L, create.assignment().size());
        Assert.assertEquals(1L, create.poll(1000L).count());
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(1000L).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.1
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(1L));
                put(new TopicPartition("topic", 1), new OffsetAndMetadata(1L));
            }
        });
        KafkaConsumer create2 = accessor.createConsumerFactory().create("consumer");
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertTrue(create.poll(1000L).isEmpty());
        Assert.assertEquals(1L, create2.poll(1000L).count());
    }

    @Test
    public void testSinglePartitionTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create("consumer");
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, create.poll(1000L).count());
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(1000L).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.2
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(2L));
            }
        });
        KafkaConsumer create2 = accessor.createConsumerFactory().create("consumer");
        Assert.assertEquals(0L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
        newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertTrue(create.poll(1000L).isEmpty());
        Assert.assertEquals(1L, create2.poll(1000L).count());
    }

    @Test(timeout = 2000)
    public void testPartitionedViewSinglePartition() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(3, FirstPartitionPartitioner.class));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        PartitionedView orElseThrow = accessor.getPartitionedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing partitioned view");
        });
        List partitions = orElseThrow.getPartitions();
        Assert.assertEquals(3L, partitions.size());
        ArrayList newArrayList = Lists.newArrayList(partitions.subList(0, 1));
        final ArrayList arrayList = new ArrayList();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new LocalExecutor().submit(orElseThrow.observePartitions(newArrayList, new PartitionedLogObserver<Void>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.3
            public void onRepartition(Collection<Partition> collection) {
                arrayList.addAll(collection);
            }

            public boolean onNext(StreamElement streamElement, PartitionedLogObserver.ConfirmCallback confirmCallback, Partition partition, PartitionedLogObserver.Consumer<Void> consumer) {
                Assert.assertEquals(0L, partition.getId());
                confirmCallback.confirm();
                try {
                    synchronousQueue.put(streamElement);
                    return false;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public void onCompleted() {
            }

            public void onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).getFlow());
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        StreamElement streamElement = (StreamElement) synchronousQueue.take();
        Assert.assertEquals(update.getKey(), streamElement.getKey());
        Assert.assertEquals(update.getAttribute(), streamElement.getAttribute());
        Assert.assertEquals(update.getAttributeDescriptor(), streamElement.getAttributeDescriptor());
        Assert.assertEquals(update.getEntityDescriptor(), streamElement.getEntityDescriptor());
        Assert.assertArrayEquals(update.getValue(), streamElement.getValue());
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 2000)
    public void testPartitionedView() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        PartitionedView orElseThrow = accessor.getPartitionedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing partitioned view");
        });
        final ArrayList arrayList = new ArrayList();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new LocalExecutor().submit(orElseThrow.observe("test", new PartitionedLogObserver<Void>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.4
            public void onRepartition(Collection<Partition> collection) {
                arrayList.addAll(collection);
            }

            public boolean onNext(StreamElement streamElement, PartitionedLogObserver.ConfirmCallback confirmCallback, Partition partition, PartitionedLogObserver.Consumer<Void> consumer) {
                confirmCallback.confirm();
                try {
                    synchronousQueue.put(streamElement);
                    return false;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public void onCompleted() {
            }

            public void onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).getFlow());
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        StreamElement streamElement = (StreamElement) synchronousQueue.take();
        Assert.assertEquals(update.getKey(), streamElement.getKey());
        Assert.assertEquals(update.getAttribute(), streamElement.getAttribute());
        Assert.assertEquals(update.getAttributeDescriptor(), streamElement.getAttributeDescriptor());
        Assert.assertEquals(update.getEntityDescriptor(), streamElement.getEntityDescriptor());
        Assert.assertArrayEquals(update.getValue(), streamElement.getValue());
        Assert.assertEquals(3L, arrayList.size());
    }

    @Test(timeout = 2000)
    public void testBulkObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CommitLogReader orElseThrow = accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        orElseThrow.observeBulk("test", CommitLogReader.Position.NEWEST, new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.5
            public boolean onNext(StreamElement streamElement, BulkLogObserver.BulkCommitter bulkCommitter) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                countDownLatch.countDown();
                atomicReference.set(th);
                throw new RuntimeException(th);
            }

            public void close() throws Exception {
            }
        });
        newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals("FAIL!", ((Throwable) atomicReference.get()).getMessage());
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test(timeout = 2000)
    public void testBulkObserveWithExceptionAndRetry() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CommitLogReader orElseThrow = accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        new RetryableBulkObserver(3, "test", orElseThrow) { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.6
            protected void failure() {
                countDownLatch.countDown();
            }

            protected boolean onNextInternal(StreamElement streamElement, BulkLogObserver.BulkCommitter bulkCommitter) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }
        }.start();
        Executors.newCachedThreadPool().execute(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                    newWriter.write(update, (z, th) -> {
                        Assert.assertTrue(z);
                    });
                } catch (InterruptedException e) {
                    return;
                }
            }
        });
        countDownLatch.await();
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test(timeout = 2000)
    public void testBulkObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageURI, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter newWriter = accessor.newWriter();
        CommitLogReader orElseThrow = accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        orElseThrow.observeBulk("test", CommitLogReader.Position.NEWEST, new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.7
            public boolean onNext(StreamElement streamElement, BulkLogObserver.BulkCommitter bulkCommitter) {
                atomicInteger.incrementAndGet();
                atomicReference2.set(streamElement);
                bulkCommitter.commit();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }

            public void close() throws Exception {
            }
        });
        newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertArrayEquals(update.getValue(), ((StreamElement) atomicReference2.get()).getValue());
    }

    private static Map<String, Object> partitionsCfg(int i) {
        return partitionsCfg(i, null);
    }

    private static Map<String, Object> partitionsCfg(int i, @Nullable Class<? extends Partitioner> cls) {
        HashMap hashMap = new HashMap();
        hashMap.put(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, String.valueOf(i));
        if (cls != null) {
            hashMap.put("partitioner", cls.getName());
        }
        return hashMap;
    }

    private static byte[] emptyValue() {
        return new byte[0];
    }

    private static Context context() {
        return new Context(() -> {
            return Executors.newCachedThreadPool();
        }) { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.8
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -931363980:
                if (implMethodName.equals("lambda$context$4e08ebfe$1")) {
                    z = true;
                    break;
                }
                break;
            case 218366049:
                if (implMethodName.equals("lambda$testManualPartitionAssignment$89f1a89$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return () -> {
                        return Executors.newCachedThreadPool();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
