/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.RetryableLogObserver;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.direct.kafka.KafkaStreamElement;
import cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptor;
import cz.o2.proxima.direct.kafka.PartitionWithTopic;
import cz.o2.proxima.direct.kafka.TopicOffset;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterators;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorBase;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicy;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
import cz.o2.proxima.util.TestUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalKafkaCommitLogDescriptorTest
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptorTest.class);
    private final transient Factory<ExecutorService> serviceFactory = (Factory & Serializable)() -> Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r);
        t.setUncaughtExceptionHandler((thr, exc) -> exc.printStackTrace(System.err));
        return t;
    });
    private final transient Repository repo = ConfigRepository.Builder.ofTest((Config)ConfigFactory.empty()).build();
    private final DirectDataOperator direct = (DirectDataOperator)this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[]{(Consumer & Serializable)op -> op.withExecutorFactory(this.serviceFactory)});
    private final AttributeDescriptorBase<byte[]> attr = AttributeDescriptor.newBuilder((Repository)this.repo).setEntity("entity").setName("attr").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorBase<byte[]> attrWildcard = AttributeDescriptor.newBuilder((Repository)this.repo).setEntity("entity").setName("wildcard.*").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorBase<String> strAttr = AttributeDescriptor.newBuilder((Repository)this.repo).setEntity("entity").setName("strAttr").setSchemeUri(new URI("string:///")).build();
    private final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).addAttribute(this.attrWildcard).addAttribute(this.strAttr).build();
    private final String topic;
    private final URI storageUri = new URI("kafka-test://dummy/" + this.topic);
    private LocalKafkaCommitLogDescriptor kafka;

    public LocalKafkaCommitLogDescriptorTest() throws Exception {
        this.topic = "topic";
    }

    @Before
    public void setUp() {
        this.kafka = new LocalKafkaCommitLogDescriptor();
    }

    @Test(timeout=10000L)
    public void testSinglePartitionWriteAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        CountDownLatch latch = new CountDownLatch(1);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)polled.count());
        Assert.assertEquals((long)1L, (long)polled.partitions().size());
        TopicPartition partition = (TopicPartition)Iterators.getOnlyElement(polled.partitions().iterator());
        Assert.assertEquals((long)0L, (long)partition.partition());
        Assert.assertEquals((Object)"topic", (Object)partition.topic());
        int tested = 0;
        for (ConsumerRecord r : polled) {
            Assert.assertEquals((Object)"key#attr", (Object)r.key());
            Assert.assertEquals((Object)"topic", (Object)r.topic());
            Assert.assertArrayEquals((byte[])LocalKafkaCommitLogDescriptorTest.emptyValue(), (byte[])((byte[])r.value()));
            ++tested;
        }
        Assert.assertEquals((long)1L, (long)tested);
    }

    @Test(timeout=10000L)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CountDownLatch latch = new CountDownLatch(2);
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)2L, (long)polled.count());
        Assert.assertEquals((long)2L, (long)polled.partitions().size());
        Iterator iterator = polled.partitions().iterator();
        TopicPartition first = (TopicPartition)iterator.next();
        Assert.assertEquals((long)0L, (long)first.partition());
        TopicPartition second = (TopicPartition)iterator.next();
        Assert.assertEquals((long)1L, (long)second.partition());
        Assert.assertEquals((Object)"topic", (Object)first.topic());
        int tested = 0;
        for (ConsumerRecord r : polled) {
            Assert.assertEquals((Object)("key" + ++tested + "#attr"), (Object)r.key());
            Assert.assertEquals((Object)"topic", (Object)r.topic());
            Assert.assertArrayEquals((byte[])LocalKafkaCommitLogDescriptorTest.emptyValue(), (byte[])((byte[])r.value()));
        }
        Assert.assertEquals((long)2L, (long)tested);
    }

    @Test
    public void testEmptyPoll() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        Assert.assertTrue((boolean)consumer.poll(Duration.ofMillis(100L)).isEmpty());
    }

    @Test
    public void testWriteNull() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        OnlineAttributeWriter writer = ((AttributeWriterBase)Optionals.get((Optional)accessor.getWriter(this.context()))).online();
        long now = 1234567890000L;
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)now, (byte[])new byte[]{1}), (succ, exc) -> {});
        writer.write(StreamElement.delete((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)(now + 1000L)), (succ, exc) -> {});
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(100L));
        Assert.assertEquals((long)2L, (long)polled.count());
        int matched = 0;
        for (ConsumerRecord r : polled) {
            if (r.timestamp() == now) {
                Assert.assertEquals((long)1L, (long)((byte[])r.value()).length);
                ++matched;
                continue;
            }
            if (r.timestamp() != now + 1000L) continue;
            Assert.assertNull((Object)r.value());
            ++matched;
        }
        Assert.assertEquals((long)2L, (long)matched);
    }

    @Test(timeout=10000L)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunBeforeWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
        });
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
        });
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)2L, (long)polled.count());
        Assert.assertEquals((long)2L, (long)polled.partitions().size());
        Iterator iterator = polled.partitions().iterator();
        TopicPartition first = (TopicPartition)iterator.next();
        Assert.assertEquals((long)0L, (long)first.partition());
        TopicPartition second = (TopicPartition)iterator.next();
        Assert.assertEquals((long)1L, (long)second.partition());
        Assert.assertEquals((Object)"topic", (Object)first.topic());
        int tested = 0;
        for (ConsumerRecord r : polled) {
            Assert.assertEquals((Object)("key" + ++tested + "#attr"), (Object)r.key());
            Assert.assertEquals((Object)"topic", (Object)r.topic());
            Assert.assertArrayEquals((byte[])LocalKafkaCommitLogDescriptorTest.emptyValue(), (byte[])((byte[])r.value()));
        }
        Assert.assertEquals((long)2L, (long)tested);
    }

    @Test(timeout=10000L)
    public void testTwoPartitionsTwoWritesAndTwoReads() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer consumer = accessor.createConsumerFactory().create();
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
        });
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)polled.count());
        Assert.assertEquals((long)1L, (long)polled.partitions().size());
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
        });
        polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)polled.count());
        Assert.assertEquals((long)1L, (long)polled.partitions().size());
    }

    @Test(timeout=10000L)
    public void testTwoIdependentConsumers() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer[] consumers = new KafkaConsumer[]{accessor.createConsumerFactory().create("dummy1"), accessor.createConsumerFactory().create("dummy2")};
        CountDownLatch latch = new CountDownLatch(1);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        for (KafkaConsumer consumer : consumers) {
            ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
            Assert.assertEquals((long)1L, (long)polled.count());
            Assert.assertEquals((long)1L, (long)polled.partitions().size());
            TopicPartition partition = (TopicPartition)Iterators.getOnlyElement(polled.partitions().iterator());
            Assert.assertEquals((long)0L, (long)partition.partition());
            Assert.assertEquals((Object)"topic", (Object)partition.topic());
            int tested = 0;
            for (ConsumerRecord r : polled) {
                Assert.assertEquals((Object)"key#attr", (Object)r.key());
                Assert.assertEquals((Object)"topic", (Object)r.topic());
                Assert.assertArrayEquals((byte[])LocalKafkaCommitLogDescriptorTest.emptyValue(), (byte[])((byte[])r.value()));
                ++tested;
            }
            Assert.assertEquals((long)1L, (long)tested);
        }
    }

    @Test(timeout=10000L)
    public void testManualPartitionAssignment() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer consumer = accessor.createConsumerFactory().create(Collections.singletonList(this.getPartition(0)));
        CountDownLatch latch = new CountDownLatch(2);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)polled.count());
        Assert.assertEquals((long)1L, (long)polled.partitions().size());
        Iterator iterator = polled.partitions().iterator();
        TopicPartition first = (TopicPartition)iterator.next();
        Assert.assertEquals((long)0L, (long)first.partition());
        Assert.assertEquals((Object)"topic", (Object)first.topic());
        int tested = 0;
        for (ConsumerRecord r : polled) {
            Assert.assertEquals((Object)("key" + ++tested + "#attr"), (Object)r.key());
            Assert.assertEquals((Object)"topic", (Object)r.topic());
            Assert.assertArrayEquals((byte[])LocalKafkaCommitLogDescriptorTest.emptyValue(), (byte[])((byte[])r.value()));
        }
        Assert.assertEquals((long)1L, (long)tested);
    }

    @Test(timeout=10000L)
    public void testPollAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CountDownLatch latch = new CountDownLatch(2);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        KafkaConsumer consumer = accessor.createConsumerFactory().create(Collections.singletonList(this.getPartition(0)));
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(100L));
        Assert.assertTrue((boolean)polled.isEmpty());
    }

    @Test(timeout=10000L)
    public void testPollWithSeek() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CountDownLatch latch = new CountDownLatch(2);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        KafkaConsumer consumer = accessor.createConsumerFactory().create(Collections.singletonList(this.getPartition(0)));
        consumer.seek(new TopicPartition("topic", 0), 1L);
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(100L));
        Assert.assertEquals((long)1L, (long)polled.count());
    }

    @Test
    public void testTwoPartitionsTwoConsumersRebalance() {
        String name = "consumer";
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(2)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer c1 = accessor.createConsumerFactory().create("consumer");
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        ConsumerRecords poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)2L, (long)c1.assignment().size());
        Assert.assertEquals((long)1L, (long)poll.count());
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)poll.count());
        c1.commitSync((Map)new HashMap<TopicPartition, OffsetAndMetadata>(){
            {
                this.put(new TopicPartition("topic", 0), new OffsetAndMetadata(1L));
                this.put(new TopicPartition("topic", 1), new OffsetAndMetadata(1L));
            }
        });
        KafkaConsumer c2 = accessor.createConsumerFactory().create("consumer");
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        poll = c2.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)poll.count());
        poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertTrue((boolean)poll.isEmpty());
        Assert.assertEquals((long)1L, (long)c1.assignment().size());
        Assert.assertEquals((long)1L, (long)c2.assignment().size());
    }

    @Test
    public void testSinglePartitionTwoConsumersRebalance() {
        String name = "consumer";
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        KafkaConsumer c1 = accessor.createConsumerFactory().create(name);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        ConsumerRecords poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)c1.assignment().size());
        Assert.assertEquals((long)1L, (long)poll.count());
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)poll.count());
        c1.commitSync((Map)new HashMap<TopicPartition, OffsetAndMetadata>(){
            {
                this.put(new TopicPartition("topic", 0), new OffsetAndMetadata(2L));
            }
        });
        KafkaConsumer c2 = accessor.createConsumerFactory().create(name);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {});
        poll = c2.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)poll.count());
        poll = c1.poll(Duration.ofMillis(1000L));
        Assert.assertTrue((boolean)poll.isEmpty());
        Assert.assertEquals((long)0L, (long)c1.assignment().size());
        Assert.assertEquals((long)1L, (long)c2.assignment().size());
    }

    @Test(timeout=10000L)
    public void testObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final CountDownLatch latch = new CountDownLatch(2);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                context.confirm();
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        });
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            latch.countDown();
        });
        latch.await();
        Assert.assertEquals((long)3L, (long)handle.getCommittedOffsets().size());
        long sum = handle.getCommittedOffsets().stream().mapToLong(o -> {
            TopicOffset tpo = (TopicOffset)o;
            Assert.assertTrue((tpo.getOffset() <= 1L ? 1 : 0) != 0);
            return tpo.getOffset();
        }).sum();
        Assert.assertEquals((long)1L, (long)sum);
    }

    @Test(timeout=10000L)
    public void testObserveMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        UnaryFunction & Serializable update = (UnaryFunction & Serializable)pos -> StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)("key" + pos), (String)this.attr.getName(), (long)(now + (long)pos.intValue()), (byte[])new byte[]{1, 2});
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(100);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        }).waitUntilReady();
        for (int i = 0; i < 100; ++i) {
            writer.write((StreamElement)update.apply((Object)i), (succ, e) -> {});
        }
        latch.await();
        Assert.assertTrue((watermark.get() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((watermark.get() < now * 10L ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testEmptyPollMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000")))));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)(now + 2000L), (byte[])new byte[]{1, 2});
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(2);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        }).waitUntilReady();
        writer.write(update, (succ, e) -> {});
        TimeUnit.SECONDS.sleep(2L);
        writer.write(update, (succ, e) -> {});
        latch.await();
        Assert.assertTrue((watermark.get() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((watermark.get() < now * 10L ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testEmptyPollWithNoDataMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000")))));
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(30);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }

            public void onIdle(LogObserver.OnIdleContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
            }
        }).waitUntilReady();
        TimeUnit.SECONDS.sleep(2L);
        latch.await();
        Assert.assertTrue((watermark.get() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((watermark.get() < now * 10L ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testSlowPollMovesWatermarkSlowly() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000")))));
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(30);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }

            public void onIdle(LogObserver.OnIdleContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
            }
        }).waitUntilReady();
        TimeUnit.SECONDS.sleep(2L);
        latch.await();
        Assert.assertTrue((watermark.get() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((watermark.get() < now * 10L ? 1 : 0) != 0);
    }

    @Test(timeout=100000L)
    public void testPollFromMoreConsumersThanPartitionsMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000"), Pair.of((Object)"assignment-timeout-ms", (Object)"1")))));
        int numObservers = 4;
        this.testPollFromNConsumersMovesWatermarkWithNoWrite(accessor, numObservers);
        this.writeData(accessor);
        this.testPollFromNConsumersMovesWatermark(accessor, numObservers);
    }

    @Test(timeout=100000L)
    public void testPollFromManyMoreConsumersThanPartitionsMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000"), Pair.of((Object)"assignment-timeout-ms", (Object)"1")))));
        int numObservers = 40;
        this.testPollFromNConsumersMovesWatermarkWithNoWrite(accessor, numObservers);
        this.writeData(accessor);
        this.testPollFromNConsumersMovesWatermark(accessor, numObservers);
        accessor.clear();
    }

    void writeData(LocalKafkaCommitLogDescriptor.Accessor accessor) {
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        long now = 1500000000000L;
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)"key", (String)UUID.randomUUID().toString(), (String)this.attr.getName(), (long)now, (byte[])new byte[]{1}), (succ, exc) -> {});
    }

    void testPollFromNConsumersMovesWatermark(LocalKafkaCommitLogDescriptor.Accessor accessor, int numObservers) throws InterruptedException {
        this.testPollFromNConsumersMovesWatermark(accessor, numObservers, true);
    }

    void testPollFromNConsumersMovesWatermarkWithNoWrite(LocalKafkaCommitLogDescriptor.Accessor accessor, int numObservers) throws InterruptedException {
        this.testPollFromNConsumersMovesWatermark(accessor, numObservers, false);
    }

    void testPollFromNConsumersMovesWatermark(LocalKafkaCommitLogDescriptor.Accessor accessor, final int numObservers, final boolean expectMoved) throws InterruptedException {
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        final CountDownLatch latch = new CountDownLatch(numObservers);
        final Set movedConsumers = Collections.synchronizedSet(new HashSet());
        final ConcurrentHashMap observerWatermarks = new ConcurrentHashMap();
        final AtomicInteger readyObservers = new AtomicInteger();
        for (int i = 0; i < numObservers; ++i) {
            reader.observe("test-" + expectMoved, Position.OLDEST, new LogObserver(){

                public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                    log.debug("Received element {} on watermark {}", (Object)ingest, (Object)context.getWatermark());
                    return true;
                }

                public boolean onError(Throwable error) {
                    throw new RuntimeException(error);
                }

                public void onIdle(LogObserver.OnIdleContext context) {
                    if (readyObservers.get() == numObservers) {
                        observerWatermarks.compute(this, (k, v) -> Math.max((Long)MoreObjects.firstNonNull((Object)v, (Object)Long.MIN_VALUE), context.getWatermark()));
                        if ((!expectMoved || (Long)observerWatermarks.get(this) > 0L) && movedConsumers.add(this)) {
                            latch.countDown();
                        }
                    }
                }
            }).waitUntilReady();
            readyObservers.incrementAndGet();
        }
        Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        Assert.assertEquals((long)numObservers, (long)observerWatermarks.size());
        long watermark = observerWatermarks.values().stream().min(Long::compare).orElse(Long.MIN_VALUE);
        Assert.assertTrue((!expectMoved || watermark > 0L ? 1 : 0) != 0);
        Assert.assertTrue((String)("Watermark should not be too far, got " + watermark + " calculated from " + observerWatermarks), (watermark < now * 10L ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testObserveBulkCommitsCorrectly() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"assignment-timeout-ms", (Object)1L), Pair.of((Object)"local-kafka-num-partitions", (Object)3))));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        for (int i = 0; i < 100; ++i) {
            StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)("key-" + i), (String)this.attr.getName(), (long)(now + 2000L), (byte[])new byte[]{1, 2});
            writer.write(update, (succ, e) -> {});
        }
        final CountDownLatch latch = new CountDownLatch(1);
        ObserveHandle handle = reader.observeBulk("test", Position.OLDEST, true, new LogObserver(){
            int processed = 0;

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                if (++this.processed == 100) {
                    context.confirm();
                }
                return true;
            }

            public void onCompleted() {
                latch.countDown();
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        });
        latch.await();
        long offsetSum = handle.getCommittedOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum();
        Assert.assertEquals((long)100L, (long)offsetSum);
        KafkaConsumer<Object, Object> consumer = ((LocalKafkaCommitLogDescriptor.LocalKafkaLogReader)reader).getConsumer();
        String topic = accessor.getTopic();
        Assert.assertEquals((long)100L, (long)consumer.committed(handle.getCommittedOffsets().stream().map(o -> new TopicPartition(topic, o.getPartition().getId())).collect(Collectors.toSet())).values().stream().mapToLong(OffsetAndMetadata::offset).sum());
    }

    @Test(timeout=100000L)
    public void testOnlineObserveWithRebalanceResetsOffsetCommitter() throws InterruptedException {
        final int numWrites = 5;
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"local-kafka-num-partitions", (Object)3), Pair.of((Object)"kafka.max.poll.records", (Object)1))));
        final CountDownLatch latch = new CountDownLatch(numWrites);
        final AtomicInteger consumed = new AtomicInteger();
        final List unconfirmed = Collections.synchronizedList(new ArrayList());
        LogObserver observer = new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                switch (consumed.getAndIncrement()) {
                    case 0: {
                        context.confirm();
                        break;
                    }
                    case 2: {
                        throw new RuntimeException("Failing first consumer!");
                    }
                    default: {
                        unconfirmed.add(context);
                    }
                }
                if (consumed.get() == numWrites) {
                    unconfirmed.forEach(LogObserver.OffsetCommitter::confirm);
                }
                latch.countDown();
                return true;
            }

            public void onCompleted() {
            }

            public boolean onError(Throwable error) {
                return true;
            }
        };
        this.testOnlineObserveWithRebalanceResetsOffsetCommitterWithObserver(observer, accessor, numWrites);
        latch.await();
        Assert.assertEquals((String)("Invalid committed offests: " + accessor.committedOffsets), (long)3L, (long)accessor.committedOffsets.values().stream().mapToInt(e -> e.get()).sum());
    }

    private void testOnlineObserveWithRebalanceResetsOffsetCommitterWithObserver(LogObserver observer, LocalKafkaCommitLogDescriptor.Accessor accessor, int numWrites) throws InterruptedException {
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        AtomicInteger consumed = new AtomicInteger();
        List unconfirmed = Collections.synchronizedList(new ArrayList());
        ObserveHandle[] handles = new ObserveHandle[]{reader.observe("test", Position.NEWEST, observer), reader.observe("test", Position.NEWEST, observer)};
        for (int i = 0; i < numWrites; ++i) {
            writer.write(update, (succ, e) -> Assert.assertTrue((boolean)succ));
        }
    }

    @Test(timeout=10000L)
    public void testObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        final AtomicReference exc = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(2);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                restarts.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                exc.set(error);
                latch.countDown();
                throw new RuntimeException(error);
            }
        });
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            latch.countDown();
        });
        latch.await();
        Assert.assertEquals((Object)"FAIL!", (Object)((Throwable)exc.get()).getMessage());
        Assert.assertEquals((long)1L, (long)restarts.get());
        Assert.assertEquals((long)3L, (long)handle.getCommittedOffsets().size());
        List startedOffsets = handle.getCurrentOffsets().stream().map(o -> ((TopicOffset)o).getOffset()).filter(o -> o >= 0L).collect(Collectors.toList());
        Assert.assertEquals(Collections.singletonList(0L), startedOffsets);
    }

    @Test(timeout=10000L)
    public void testBulkObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        final AtomicReference exc = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(2);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observeBulk("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                restarts.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                exc.set(error);
                latch.countDown();
                throw new RuntimeException(error);
            }
        });
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            latch.countDown();
        });
        latch.await();
        Assert.assertEquals((Object)"FAIL!", (Object)((Throwable)exc.get()).getMessage());
        Assert.assertEquals((long)1L, (long)restarts.get());
        Assert.assertEquals((long)3L, (long)handle.getCommittedOffsets().size());
        List startedOffsets = handle.getCurrentOffsets().stream().map(o -> ((TopicOffset)o).getOffset()).filter(o -> o >= 0L).collect(Collectors.toList());
        Assert.assertEquals(Collections.singletonList(0L), startedOffsets);
    }

    @Test(timeout=10000L)
    public void testBulkObserveWithExceptionAndRetry() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        int numRetries = 3;
        final CountDownLatch latch = new CountDownLatch(1);
        RetryableLogObserver observer = RetryableLogObserver.of((String)"test", (int)3, (LogObserver)new LogObserver(){

            public boolean onError(Throwable error) {
                latch.countDown();
                return false;
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext confirm) {
                restarts.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }
        });
        reader.observe("test", (LogObserver)observer);
        Executors.newCachedThreadPool().execute(() -> {
            while (!ExceptionUtils.ignoringInterrupted((ExceptionUtils.ThrowingRunnable & Serializable)() -> TimeUnit.MILLISECONDS.sleep(100L))) {
                writer.write(update, (succ, e) -> Assert.assertTrue((boolean)succ));
            }
        });
        latch.await();
        Assert.assertEquals((long)4L, (long)restarts.get());
    }

    @Test(timeout=10000L)
    public void testBulkObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        final AtomicReference exc = new AtomicReference();
        final AtomicReference input = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(2);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observeBulk("test", Position.NEWEST, new LogObserver(){

            public void onRepartition(LogObserver.OnRepartitionContext context) {
                restarts.incrementAndGet();
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                input.set(ingest);
                context.confirm();
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                exc.set(error);
                throw new RuntimeException(error);
            }
        });
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            latch.countDown();
        });
        latch.await();
        Assert.assertNull(exc.get());
        Assert.assertTrue((restarts.get() > 0 ? 1 : 0) != 0);
        Assert.assertArrayEquals((byte[])update.getValue(), (byte[])((StreamElement)input.get()).getValue());
        Assert.assertEquals((long)3L, (long)handle.getCommittedOffsets().size());
        Assert.assertEquals((String)handle.getCommittedOffsets().toString(), (long)1L, (long)handle.getCommittedOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum());
    }

    @Test(timeout=10000L)
    public void testBulkObservePartitionsFromOldestSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger consumed = new AtomicInteger();
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        for (int i = 0; i < 1000; ++i) {
            writer.write(update, (succ, e) -> Assert.assertTrue((boolean)succ));
        }
        final CountDownLatch latch = new CountDownLatch(1);
        reader.observeBulkPartitions((Collection)reader.getPartitions(), Position.OLDEST, true, new LogObserver(){

            public void onRepartition(LogObserver.OnRepartitionContext context) {
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                consumed.incrementAndGet();
                context.confirm();
                return true;
            }

            public void onCompleted() {
                latch.countDown();
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        });
        latch.await();
        Assert.assertEquals((long)1000L, (long)consumed.get());
    }

    @Test(timeout=10000L)
    public void testBulkObservePartitionsSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        final AtomicReference exc = new AtomicReference();
        final AtomicReference input = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(2);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observeBulkPartitions((Collection)reader.getPartitions(), Position.NEWEST, new LogObserver(){

            public void onRepartition(LogObserver.OnRepartitionContext context) {
                restarts.incrementAndGet();
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                input.set(ingest);
                context.confirm();
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                exc.set(error);
                throw new RuntimeException(error);
            }
        });
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            latch.countDown();
        });
        latch.await();
        Assert.assertNull(exc.get());
        Assert.assertEquals((long)1L, (long)restarts.get());
        Assert.assertArrayEquals((byte[])update.getValue(), (byte[])((StreamElement)input.get()).getValue());
        Assert.assertEquals((long)3L, (long)handle.getCommittedOffsets().size());
        Assert.assertEquals((long)1L, (long)handle.getCommittedOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum());
    }

    @Test(timeout=10000L)
    public void testBulkObservePartitionsResetOffsetsSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final AtomicInteger restarts = new AtomicInteger();
        final AtomicReference exc = new AtomicReference();
        final AtomicReference input = new AtomicReference();
        final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(2));
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        ObserveHandle handle = reader.observePartitions((Collection)reader.getPartitions(), Position.NEWEST, new LogObserver(){

            public void onRepartition(LogObserver.OnRepartitionContext context) {
                restarts.incrementAndGet();
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                input.set(ingest);
                context.confirm();
                ((CountDownLatch)latch.get()).countDown();
                return true;
            }

            public boolean onError(Throwable error) {
                exc.set(error);
                throw new RuntimeException(error);
            }
        });
        handle.waitUntilReady();
        writer.write(update, (succ, e) -> {
            Assert.assertTrue((boolean)succ);
            ((CountDownLatch)latch.get()).countDown();
        });
        latch.get().await();
        latch.set(new CountDownLatch(1));
        handle.resetOffsets(reader.getPartitions().stream().map(p -> (PartitionWithTopic)p).map(p -> new TopicOffset(new PartitionWithTopic(p.getTopic(), p.getId()), 0L, Long.MIN_VALUE)).collect(Collectors.toList()));
        latch.get().await();
        Assert.assertEquals((long)1L, (long)handle.getCommittedOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum());
    }

    @Test
    public void testObserveOnNonExistingTopic() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaLogReader reader = accessor.newReader(this.context());
        try {
            Assert.assertNotNull((Object)reader.getPartitions());
            reader.validateTopic(reader.getConsumer(), "non-existing-topic");
            Assert.fail((String)"Should throw exception");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"Received null or empty partitions for topic [non-existing-topic]. Please check that the topic exists and has at least one partition.", (Object)ex.getMessage());
            return;
        }
        Assert.fail((String)"Should throw IllegalArgumentException");
    }

    @Test(timeout=10000L)
    public void testBulkObserveOffsets() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final ArrayList input = new ArrayList();
        final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(3));
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        HashMap currentOffsets = new HashMap();
        LogObserver observer = new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                input.add((KafkaStreamElement)ingest);
                context.confirm();
                ((CountDownLatch)latch.get()).countDown();
                return false;
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        };
        try (ObserveHandle handle = reader.observeBulkPartitions((Collection)reader.getPartitions(), Position.NEWEST, observer);){
            for (int i = 0; i < 2; ++i) {
                writer.write(update, (succ, e) -> {
                    Assert.assertTrue((boolean)succ);
                    ((CountDownLatch)latch.get()).countDown();
                });
            }
            latch.get().await();
            latch.set(new CountDownLatch(1));
            handle.getCommittedOffsets().forEach(o -> currentOffsets.put(o.getPartition().getId(), o));
        }
        Assert.assertEquals((long)3L, (long)currentOffsets.size());
        Assert.assertEquals((String)((Object)currentOffsets).toString(), (long)1L, (long)currentOffsets.values().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum());
        ObserveHandle handle2 = reader.observeBulkOffsets((Collection)Lists.newArrayList(currentOffsets.values()), observer);
        latch.get().await();
        Assert.assertEquals((long)2L, (long)input.size());
        Assert.assertEquals((long)0L, (long)((KafkaStreamElement)input.get(0)).getOffset());
        Assert.assertEquals((long)1L, (long)((KafkaStreamElement)input.get(1)).getOffset());
        Assert.assertEquals((long)2L, (long)handle2.getCommittedOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).sum());
    }

    @Test(timeout=10000L)
    public void testBulkObserveOffsets2() throws InterruptedException {
        List offsets;
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        final ArrayList input = new ArrayList();
        final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(3));
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        LogObserver observer = new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                input.add((KafkaStreamElement)ingest);
                ((CountDownLatch)latch.get()).countDown();
                return false;
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        };
        try (ObserveHandle handle = reader.observeBulkPartitions((Collection)reader.getPartitions(), Position.NEWEST, observer);){
            for (int i = 0; i < 2; ++i) {
                writer.write(update, (succ, e) -> {
                    Assert.assertTrue((boolean)succ);
                    ((CountDownLatch)latch.get()).countDown();
                });
            }
            latch.get().await();
            latch.set(new CountDownLatch(1));
            offsets = handle.getCurrentOffsets();
        }
        ObserveHandle handle2 = reader.observeBulkOffsets((Collection)Lists.newArrayList((Iterable)offsets), observer);
        latch.get().await();
        Assert.assertEquals((long)2L, (long)input.size());
        Assert.assertEquals((long)0L, (long)((KafkaStreamElement)input.get(0)).getOffset());
        Assert.assertEquals((long)0L, (long)((KafkaStreamElement)input.get(1)).getOffset());
    }

    @Test(timeout=60000L)
    public void testCurrentOffsetsReflectSeek() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CountDownLatch latch = new CountDownLatch(10);
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        for (int i = 0; i < 10; ++i) {
            writer.write(update, (succ, exc) -> latch.countDown());
        }
        latch.await();
        ObserveHandle handle = reader.observe("name", Position.OLDEST, new LogObserver(){

            public boolean onError(Throwable error) {
                return false;
            }

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                return false;
            }
        });
        handle.waitUntilReady();
        handle.close();
        Assert.assertEquals((long)3L, (long)handle.getCurrentOffsets().size());
        Assert.assertEquals((long)0L, (long)handle.getCurrentOffsets().stream().mapToLong(o -> ((TopicOffset)o).getOffset()).filter(o -> o >= 0L).sum());
    }

    @Test(timeout=10000L)
    public void testCachedView() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2});
        writer.write(update, (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            ((CountDownLatch)latch.get()).countDown();
        });
        latch.get().await();
        latch.set(new CountDownLatch(1));
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertArrayEquals((byte[])new byte[]{1, 2}, (byte[])((KeyValue)view.get("key", this.attr).get()).getValue());
        update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2, 3});
        writer.write(update, (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            ((CountDownLatch)latch.get()).countDown();
        });
        latch.get().await();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertArrayEquals((byte[])new byte[]{1, 2, 3}, (byte[])((KeyValue)view.get("key", this.attr).get()).getValue());
    }

    private Partition getPartition(int partition) {
        return this.getPartition(this.topic, partition);
    }

    private Partition getPartition(String topic, int partition) {
        return new PartitionWithTopic(topic, partition);
    }

    @Test(timeout=10000L)
    public void testCachedViewReload() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(2));
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{2, 3}));
        updates.forEach(update -> writer.write((StreamElement)update, (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            ((CountDownLatch)latch.get()).countDown();
        }));
        latch.get().await();
        latch.set(new CountDownLatch(1));
        view.assign((Collection)IntStream.range(1, 2).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertFalse((boolean)view.get("key2", this.attr).isPresent());
        Assert.assertTrue((boolean)view.get("key1", this.attr).isPresent());
        StreamElement update2 = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2, 3});
        writer.write(update2, (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            ((CountDownLatch)latch.get()).countDown();
        });
        latch.get().await();
        TimeUnit.SECONDS.sleep(1L);
        view.assign((Collection)IntStream.range(1, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertTrue((boolean)view.get("key2", this.attr).isPresent());
        Assert.assertTrue((boolean)view.get("key1", this.attr).isPresent());
    }

    @Test(timeout=10000L)
    public void testCachedViewWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{2, 3}));
        CountDownLatch latch = new CountDownLatch(2);
        updates.forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        Assert.assertTrue((boolean)view.get("key2", this.attr).isPresent());
        Assert.assertTrue((boolean)view.get("key1", this.attr).isPresent());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertTrue((boolean)view.get("key2", this.attr).isPresent());
        Assert.assertTrue((boolean)view.get("key1", this.attr).isPresent());
    }

    @Test(timeout=10000L)
    public void testCachedViewWriteAndDelete() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)(now - 1000L), (byte[])new byte[]{1, 2}), StreamElement.delete((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)now));
        CountDownLatch latch = new CountDownLatch(2);
        updates.forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        Assert.assertFalse((boolean)view.get("key1", this.attr).isPresent());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertFalse((boolean)view.get("key1", this.attr).isPresent());
    }

    @Test(timeout=10000L)
    public void testCachedViewWriteAndDeleteWildcard() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(5);
        Stream.of(StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.1", (long)(now - 1000L), (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.2", (long)(now - 500L), (byte[])new byte[]{1, 2}), StreamElement.deleteWildcard((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (long)now), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.1", (long)(now + 500L), (byte[])new byte[]{2, 3}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.3", (long)(now - 500L), (byte[])new byte[]{3, 4})).forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        Assert.assertTrue((boolean)view.get("key1", "wildcard.1", this.attrWildcard, now + 500L).isPresent());
        Assert.assertFalse((boolean)view.get("key1", "wildcard.2", this.attrWildcard, now + 500L).isPresent());
        Assert.assertFalse((boolean)view.get("key1", "wildcard.3", this.attrWildcard, now + 500L).isPresent());
        Assert.assertArrayEquals((byte[])new byte[]{2, 3}, (byte[])((KeyValue)view.get("key1", "wildcard.1", this.attrWildcard, now + 500L).get()).getValue());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertTrue((boolean)view.get("key1", "wildcard.1", this.attrWildcard, now + 500L).isPresent());
        Assert.assertFalse((boolean)view.get("key1", "wildcard.2", this.attrWildcard, now + 500L).isPresent());
        Assert.assertFalse((boolean)view.get("key1", "wildcard.3", this.attrWildcard, now + 500L).isPresent());
        Assert.assertArrayEquals((byte[])new byte[]{2, 3}, (byte[])((KeyValue)view.get("key1", "wildcard.1", this.attrWildcard, now + 500L).get()).getValue());
    }

    @Test(timeout=10000L)
    public void testCachedViewWriteAndList() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(5);
        Stream.of(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)(now - 1000L), (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.1", (long)(now - 1000L), (byte[])new byte[]{1, 2}), StreamElement.deleteWildcard((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (long)(now - 500L)), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.2", (long)now, (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.3", (long)(now - 499L), (byte[])new byte[]{3, 4})).forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)"Exception: ", (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        ArrayList res = new ArrayList();
        view.scanWildcard("key1", this.attrWildcard, res::add);
        Assert.assertEquals((long)2L, (long)res.size());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        res.clear();
        view.scanWildcard("key1", this.attrWildcard, res::add);
        Assert.assertEquals((long)2L, (long)res.size());
    }

    @Test(timeout=10000L)
    public void testCachedViewWriteAndListAll() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(5);
        Stream.of(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)(now - 2000L), (byte[])new byte[]{0}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.1", (long)(now - 1000L), (byte[])new byte[]{1, 2}), StreamElement.deleteWildcard((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (long)(now - 500L)), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.2", (long)now, (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.3", (long)(now - 499L), (byte[])new byte[]{3, 4})).forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        ArrayList res = new ArrayList();
        view.scanWildcardAll("key1", res::add);
        Assert.assertEquals((long)3L, (long)res.size());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        res.clear();
        view.scanWildcardAll("key1", res::add);
        Assert.assertEquals((long)3L, (long)res.size());
    }

    @Test(timeout=10000L)
    public void testCachedViewWritePreUpdate() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, FirstBytePartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key2", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])new byte[]{2, 3}));
        CountDownLatch latch = new CountDownLatch(updates.size());
        updates.forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        AtomicInteger calls = new AtomicInteger();
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()), (BiConsumer & Serializable)(e, c) -> calls.incrementAndGet());
        Assert.assertEquals((long)2L, (long)calls.get());
    }

    @Test(timeout=10000L)
    public void testCachedViewWritePreUpdateAndDeleteWildcard() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, KeyPartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.1", (long)now, (byte[])new byte[]{1, 2}), StreamElement.deleteWildcard((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (long)(now + 1000L)), StreamElement.upsert((EntityDescriptor)this.entity, this.attrWildcard, (String)UUID.randomUUID().toString(), (String)"key1", (String)"wildcard.2", (long)(now + 500L), (byte[])new byte[]{2, 3}));
        CountDownLatch latch = new CountDownLatch(updates.size());
        updates.forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Ex1ception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        AtomicInteger calls = new AtomicInteger();
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()), (BiConsumer & Serializable)(e, c) -> calls.incrementAndGet());
        Assert.assertEquals((long)3L, (long)calls.get());
    }

    @Test(timeout=10000L)
    public void testRewriteAndPrefetch() throws InterruptedException, IOException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(3, KeyPartitioner.class)));
        CachedView view = (CachedView)accessor.getCachedView(this.context()).orElseThrow(() -> new IllegalStateException("Missing cached view"));
        long now = System.currentTimeMillis();
        List<StreamElement> updates = Arrays.asList(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)now, (byte[])new byte[]{1, 2}), StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)now, (byte[])new byte[]{2, 3}));
        CountDownLatch latch = new CountDownLatch(updates.size());
        updates.forEach(update -> view.write(update, (succ, exc) -> {
            Assert.assertTrue((String)("Exception: " + exc), (boolean)succ);
            latch.countDown();
        }));
        latch.await();
        view.assign((Collection)IntStream.range(0, 3).mapToObj(i -> this.getPartition(i)).collect(Collectors.toList()));
        Assert.assertArrayEquals((byte[])new byte[]{2, 3}, (byte[])((KeyValue)view.get("key1", this.attr).get()).getValue());
        view.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)now, (byte[])new byte[]{3, 4}), (succ, exc) -> Assert.assertTrue((boolean)succ));
        Assert.assertArrayEquals((byte[])new byte[]{3, 4}, (byte[])((KeyValue)view.get("key1", this.attr).get()).getValue());
        view.close();
        Assert.assertFalse((boolean)view.get("key1", this.attr).isPresent());
        view.assign((Collection)IntStream.range(0, 3).mapToObj(this::getPartition).collect(Collectors.toList()));
        Assert.assertArrayEquals((byte[])new byte[]{3, 4}, (byte[])((KeyValue)view.get("key1", this.attr).get()).getValue());
    }

    @Test(timeout=5000L)
    public void testMaxBytesPerSec() throws InterruptedException {
        long maxLatency = this.testSequentialConsumption(3L);
        long expectedNanos = TimeUnit.MILLISECONDS.toNanos(500L);
        Assert.assertTrue((String)String.format("maxLatency should be greater than %d, got %d", expectedNanos, maxLatency), (maxLatency > expectedNanos ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testNoMaxBytesPerSec() throws InterruptedException {
        long maxLatency = this.testSequentialConsumption(Long.MAX_VALUE);
        Assert.assertTrue((maxLatency < 500000000L ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testCustomElementSerializer() throws InterruptedException {
        this.kafka = new LocalKafkaCommitLogDescriptor(accessor -> new LocalKafkaCommitLogDescriptor.Accessor((LocalKafkaCommitLogDescriptor.Accessor)((Object)accessor), Collections.singletonMap("serializer-class", SingleAttrSerializer.class.getName())));
        LocalKafkaCommitLogDescriptor.Accessor accessor2 = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.partitionsCfg(1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor2.newWriter();
        KafkaConsumer consumer = accessor2.createConsumerFactory().create();
        CountDownLatch latch = new CountDownLatch(1);
        writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.strAttr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])"this is test".getBytes()), (succ, exc) -> {
            Assert.assertTrue((boolean)succ);
            Assert.assertNull((Object)exc);
            latch.countDown();
        });
        latch.await();
        ConsumerRecords polled = consumer.poll(Duration.ofMillis(1000L));
        Assert.assertEquals((long)1L, (long)polled.count());
        Assert.assertEquals((long)1L, (long)polled.partitions().size());
        TopicPartition partition = (TopicPartition)Iterators.getOnlyElement(polled.partitions().iterator());
        Assert.assertEquals((long)0L, (long)partition.partition());
        Assert.assertEquals((Object)"topic", (Object)partition.topic());
        int tested = 0;
        for (ConsumerRecord r : polled) {
            Assert.assertEquals((Object)"key", (Object)r.key());
            Assert.assertEquals((Object)"topic", (Object)r.topic());
            Assert.assertEquals((Object)"this is test", (Object)r.value());
            ++tested;
        }
        Assert.assertEquals((long)1L, (long)tested);
    }

    @Test(timeout=10000L)
    public void testCustomWatermarkEstimator() throws InterruptedException {
        Map<String, Object> cfg = LocalKafkaCommitLogDescriptorTest.partitionsCfg(3);
        cfg.put("watermark.estimator-factory", FixedWatermarkEstimatorFactory.class.getName());
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, cfg));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        UnaryFunction & Serializable update = (UnaryFunction & Serializable)pos -> StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)("key" + pos), (String)this.attr.getName(), (long)(now + (long)pos.intValue()), (byte[])new byte[]{1, 2});
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(100);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        }).waitUntilReady();
        for (int i = 0; i < 100; ++i) {
            writer.write((StreamElement)update.apply((Object)i), (succ, e) -> {});
        }
        latch.await();
        Assert.assertEquals((long)333L, (long)watermark.get());
    }

    @Test(timeout=10000L)
    public void testCustomIdlePolicy() throws InterruptedException {
        Map<String, Object> cfg = LocalKafkaCommitLogDescriptorTest.and(LocalKafkaCommitLogDescriptorTest.partitionsCfg(3), LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"poll.allowed-empty-before-watermark-move", (Object)"1000")));
        cfg.put("watermark.idle-policy-factory", FixedWatermarkIdlePolicyFactory.class.getName());
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, cfg));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
        long now = System.currentTimeMillis();
        StreamElement update = StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key", (String)this.attr.getName(), (long)(now + 2000L), (byte[])new byte[]{1, 2});
        final AtomicLong watermark = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(2);
        reader.observe("test", Position.NEWEST, new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                watermark.set(context.getWatermark());
                latch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail((String)"This should not be called");
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        }).waitUntilReady();
        writer.write(update, (succ, e) -> {});
        TimeUnit.SECONDS.sleep(2L);
        writer.write(update, (succ, e) -> {});
        latch.await();
        Assert.assertEquals((long)555L, (long)watermark.get());
    }

    private long testSequentialConsumption(long maxBytesPerSec) throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.createAccessor(this.direct, TestUtils.createTestFamily((EntityDescriptor)this.entity, (URI)this.storageUri, LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"assignment-timeout-ms", (Object)1L), Pair.of((Object)"bytes-per-sec-max", (Object)maxBytesPerSec), Pair.of((Object)"kafka.max.poll.records", (Object)1))));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter writer = accessor.newWriter();
        CommitLogReader reader = (CommitLogReader)accessor.getCommitLogReader(this.context()).orElseThrow(() -> new IllegalStateException("Missing log reader"));
        final AtomicLong lastOnNext = new AtomicLong(Long.MIN_VALUE);
        final AtomicLong maxLatency = new AtomicLong(0L);
        int numElements = 2;
        final CountDownLatch latch = new CountDownLatch(2);
        LogObserver observer = new LogObserver(){

            public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
                long now = System.nanoTime();
                long last = lastOnNext.getAndSet(now);
                if (last > 0L) {
                    long latency = now - last;
                    maxLatency.getAndUpdate(old -> Math.max(old, latency));
                }
                latch.countDown();
                return true;
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        };
        reader.observe("dummy", Position.OLDEST, observer);
        for (int i = 0; i < 2; ++i) {
            writer.write(StreamElement.upsert((EntityDescriptor)this.entity, this.attr, (String)UUID.randomUUID().toString(), (String)"key1", (String)this.attr.getName(), (long)System.currentTimeMillis(), (byte[])LocalKafkaCommitLogDescriptorTest.emptyValue()), (succ, exc) -> {
                Assert.assertTrue((boolean)succ);
                Assert.assertNull((Object)exc);
            });
        }
        latch.await();
        return maxLatency.get();
    }

    private Context context() {
        return this.direct.getContext();
    }

    private static Map<String, Object> partitionsCfg(int partitions) {
        return LocalKafkaCommitLogDescriptorTest.partitionsCfg(partitions, null);
    }

    private static Map<String, Object> partitionsCfg(int partitions, @Nullable Class<? extends Partitioner> partitioner) {
        return LocalKafkaCommitLogDescriptorTest.cfg(Pair.of((Object)"local-kafka-num-partitions", (Object)String.valueOf(partitions)), partitioner != null ? Pair.of((Object)"partitioner", (Object)partitioner.getName()) : null);
    }

    @SafeVarargs
    private static Map<String, Object> cfg(Pair<String, Object> ... pairs) {
        return Arrays.stream(pairs).filter(Objects::nonNull).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
    }

    private static Map<String, Object> and(Map<String, Object> left, Map<String, Object> right) {
        return Stream.concat(left.entrySet().stream(), right.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private static byte[] emptyValue() {
        return new byte[0];
    }

    public static final class FixedWatermarkIdlePolicyFactory
    implements WatermarkIdlePolicyFactory {
        public static final long FIXED_IDLE_WATERMARK = 555L;

        public WatermarkIdlePolicy create(Map<String, Object> cfg) {
            return (WatermarkIdlePolicy & Serializable)() -> 555L;
        }
    }

    public static final class FixedWatermarkEstimatorFactory
    implements WatermarkEstimatorFactory {
        public static final long FIXED_WATERMARK = 333L;

        public WatermarkEstimator create(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
            return new WatermarkEstimator(){

                public long getWatermark() {
                    return 333L;
                }

                public void setMinWatermark(long minWatermark) {
                }
            };
        }
    }

    public static final class SingleAttrSerializer
    implements ElementSerializer<String, String> {
        private AttributeDescriptor<?> attrDesc;

        public void setup(EntityDescriptor entityDescriptor) {
            this.attrDesc = (AttributeDescriptor)entityDescriptor.findAttribute("strAttr").orElseThrow(() -> new IllegalStateException("Missing attribute 'strAttr'"));
        }

        @Nullable
        public StreamElement read(ConsumerRecord<String, String> record, EntityDescriptor entityDesc) {
            return StreamElement.upsert((EntityDescriptor)entityDesc, this.attrDesc, (String)this.attrDesc.getName(), (String)UUID.randomUUID().toString(), (String)((String)record.key()), (long)record.timestamp(), (byte[])((String)record.value()).getBytes());
        }

        public Pair<String, String> write(StreamElement element) {
            return Pair.of((Object)element.getKey(), (Object)((String)element.getParsed().get()));
        }

        public Serde<String> keySerde() {
            return Serdes.String();
        }

        public Serde<String> valueSerde() {
            return Serdes.String();
        }
    }

    public static final class FirstBytePartitioner
    implements Partitioner {
        public int getPartitionId(StreamElement element) {
            if (!element.isDelete()) {
                return element.getValue()[0];
            }
            return 0;
        }
    }
}

