/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigResolveOptions;
import com.typesafe.config.ConfigValueFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.KafkaStreamElement;
import cz.o2.proxima.direct.kafka.TopicOffset;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;

public class KafkaLogReaderIT {
    private static final long AWAIT_TIMEOUT_MS = 5000L;
    private static final String CONFIG_FORMAT = "entities {\n  entity {\n    attributes {\n      foo: { scheme: bytes }\n    }\n  }\n}\nattributeFamilies {\n  scalar-primary {\n    entity: entity\n    attributes: [\"foo\"]\n    storage: \"%s\"\n    type: primary\n    access: %s\n    watermark {\n      idle-policy-factory: cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy.Factory\n    }\n    assignment-timeout-ms: 1000\n  }\n}\n";
    @Rule
    public final EmbeddedKafkaRule rule = new EmbeddedKafkaRule(1);
    private DirectDataOperator operator;
    private EntityDescriptor entity;
    private AttributeDescriptor<byte[]> fooDescriptor;

    private static void await(CountDownLatch latch) throws InterruptedException {
        Assert.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS));
    }

    @Before
    public void setup() {
        this.initializeTestWithUri("kafka://\"${broker}\"/foo", "commit-log");
    }

    private void initializeTestWithUri(String uri, String access) {
        Repository repository = Repository.ofTest((Config)this.createConfig(uri, access), (Repository.Validate[])new Repository.Validate[0]);
        this.entity = repository.getEntity("entity");
        this.fooDescriptor = this.entity.getAttribute("foo");
        this.operator = (DirectDataOperator)repository.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    }

    @Test(timeout=30000L)
    @Ignore(value="https://github.com/O2-Czech-Republic/proxima-platform/issues/183")
    public void testReadFromCurrent() throws InterruptedException {
        EmbeddedKafkaBroker embeddedKafka = this.rule.getEmbeddedKafka();
        int numPartitions = 3;
        embeddedKafka.addTopics(new NewTopic[]{new NewTopic("foo", 3, 1)});
        CommitLogReader commitLogReader = (CommitLogReader)Optionals.get((Optional)this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        TestLogObserver observer = new TestLogObserver();
        ObserveHandle handle = commitLogReader.observe("test-reader", Position.CURRENT, (LogObserver)observer);
        handle.waitUntilReady();
        int numElements = 100;
        this.writeElements(100);
        this.writePoisonedPills(3);
        KafkaLogReaderIT.await(observer.getCompleted());
        handle.close();
        Assert.assertEquals((long)100L, (long)observer.getNumReceivedElements());
        Assert.assertEquals((long)103L, (long)this.numCommittedElements(handle));
    }

    @Test(timeout=30000L)
    public void testReadFromOldest() throws InterruptedException {
        EmbeddedKafkaBroker embeddedKafka = this.rule.getEmbeddedKafka();
        int numPartitions = 3;
        embeddedKafka.addTopics(new NewTopic[]{new NewTopic("foo", 3, 1)});
        CommitLogReader commitLogReader = (CommitLogReader)Optionals.get((Optional)this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        int numElements = 100;
        KafkaLogReaderIT.await(this.writeElements(100));
        KafkaLogReaderIT.await(this.writePoisonedPills(3));
        TestLogObserver firstObserver = new TestLogObserver();
        ObserveHandle firstHandle = commitLogReader.observe("test-reader", Position.OLDEST, (LogObserver)firstObserver);
        KafkaLogReaderIT.await(firstObserver.getCompleted());
        Assert.assertEquals((long)100L, (long)firstObserver.getNumReceivedElements());
        Assert.assertEquals((long)103L, (long)this.numCommittedElements(firstHandle));
        firstHandle.close();
        KafkaLogReaderIT.await(this.writePoisonedPills(3));
        TestLogObserver secondObserver = new TestLogObserver();
        ObserveHandle secondHandle = commitLogReader.observe("test-reader", Position.OLDEST, (LogObserver)secondObserver);
        KafkaLogReaderIT.await(secondObserver.getCompleted());
        Assert.assertEquals((long)0L, (long)secondObserver.getNumReceivedElements());
        Assert.assertEquals((long)106L, (long)this.numCommittedElements(secondHandle));
    }

    @Test(timeout=30000L)
    public void testReadFromRegexTopics() throws InterruptedException {
        Pattern pattern = Pattern.compile("(foo|bar)");
        Assert.assertTrue((boolean)pattern.matcher("foo").find());
        this.initializeTestWithUri("kafka://\"${broker}\"/?topicPattern=(foo%7Cbar)", "[commit-log, read-only]");
        EmbeddedKafkaBroker embeddedKafka = this.rule.getEmbeddedKafka();
        int numPartitions = 3;
        embeddedKafka.addTopics(new NewTopic[]{new NewTopic("foo", 3, 1), new NewTopic("bar", 3, 1)});
        CommitLogReader commitLogReader = (CommitLogReader)Optionals.get((Optional)this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        TestLogObserver observer = new TestLogObserver();
        ObserveHandle handle = commitLogReader.observe("test-reader", Position.NEWEST, (LogObserver)observer);
        handle.waitUntilReady();
        int numElements = 100;
        this.writeUsingPublisher(100, Lists.newArrayList((Object[])new String[]{"foo", "bar"}));
        while (observer.getNumReceivedElements() < 100) {
            TimeUnit.MILLISECONDS.sleep(100L);
        }
        handle.close();
        Assert.assertEquals((long)100L, (long)observer.getNumReceivedElements());
        Assert.assertEquals((long)100L, (long)this.numCommittedElements(handle));
    }

    private long numCommittedElements(ObserveHandle handle) {
        return handle.getCommittedOffsets().stream().mapToLong(offset -> ((TopicOffset)offset).getOffset()).sum();
    }

    private CountDownLatch writeElements(int numElements) {
        OnlineAttributeWriter writer = (OnlineAttributeWriter)Optionals.get((Optional)this.operator.getWriter(this.fooDescriptor));
        CountDownLatch done = new CountDownLatch(numElements);
        for (int i = 0; i < numElements; ++i) {
            StreamElement element = StreamElement.upsert((EntityDescriptor)this.entity, this.fooDescriptor, (String)UUID.randomUUID().toString(), (String)String.format("element-%d", i), (String)this.fooDescriptor.getName(), (long)i, (byte[])"value".getBytes(StandardCharsets.UTF_8));
            writer.write(element, (success, error) -> {
                if (success) {
                    done.countDown();
                }
            });
        }
        return done;
    }

    private CountDownLatch writeUsingPublisher(int numElements, List<String> topics) {
        CountDownLatch done = new CountDownLatch(numElements);
        KafkaStreamElement.KafkaStreamElementSerializer serializer = new KafkaStreamElement.KafkaStreamElementSerializer();
        Properties props = new Properties();
        props.put("bootstrap.servers", this.getConnectString(this.rule.getEmbeddedKafka()));
        KafkaProducer producer = new KafkaProducer(props, serializer.keySerde().serializer(), serializer.valueSerde().serializer());
        Random r = new Random();
        for (int i = 0; i < numElements; ++i) {
            StreamElement element = StreamElement.upsert((EntityDescriptor)this.entity, this.fooDescriptor, (String)UUID.randomUUID().toString(), (String)String.format("element-%d", i), (String)this.fooDescriptor.getName(), (long)i, (byte[])"value".getBytes(StandardCharsets.UTF_8));
            Pair toWrite = serializer.write(element);
            String topic = topics.get(r.nextInt(topics.size()));
            producer.send(new ProducerRecord(topic, toWrite.getFirst(), toWrite.getSecond()), (meta, exc) -> {
                Assert.assertNull((Object)exc);
                done.countDown();
            });
        }
        return done;
    }

    private CountDownLatch writePoisonedPills(int numPartitions) {
        OnlineAttributeWriter writer = (OnlineAttributeWriter)Optionals.get((Optional)this.operator.getWriter(this.fooDescriptor));
        KeyPartitioner keyPartitioner = new KeyPartitioner();
        HashSet<Integer> poisonedPartitions = new HashSet<Integer>();
        CountDownLatch done = new CountDownLatch(numPartitions);
        int i = 0;
        while (poisonedPartitions.size() < numPartitions) {
            StreamElement poisonedPill = StreamElement.upsert((EntityDescriptor)this.entity, this.fooDescriptor, (String)UUID.randomUUID().toString(), (String)String.format("poisoned-pill-%d", i), (String)this.fooDescriptor.getName(), (long)Long.MAX_VALUE, (byte[])"value".getBytes(StandardCharsets.UTF_8));
            int partition = (keyPartitioner.getPartitionId(poisonedPill) & Integer.MAX_VALUE) % numPartitions;
            if (poisonedPartitions.add(partition)) {
                writer.write(poisonedPill, (success, error) -> {
                    if (success) {
                        done.countDown();
                    }
                });
            }
            ++i;
        }
        return done;
    }

    private Config createConfig(String uri, String access) {
        EmbeddedKafkaBroker embeddedKafka = this.rule.getEmbeddedKafka();
        String connectionString = this.getConnectString(embeddedKafka);
        return ConfigFactory.parseString((String)String.format(CONFIG_FORMAT, uri, access)).resolveWith(ConfigFactory.empty().withValue("broker", ConfigValueFactory.fromAnyRef((Object)connectionString)), ConfigResolveOptions.noSystem());
    }

    private String getConnectString(EmbeddedKafkaBroker embeddedKafka) {
        return Arrays.stream(embeddedKafka.getBrokerAddresses()).map(ba -> ba.getHost() + ":" + ba.getPort()).collect(Collectors.joining(","));
    }

    private static class TestLogObserver
    implements LogObserver {
        private final AtomicInteger numReceivedElements = new AtomicInteger();
        private final CountDownLatch completed = new CountDownLatch(1);

        private TestLogObserver() {
        }

        public void onCompleted() {
            this.completed.countDown();
        }

        public boolean onError(Throwable error) {
            return false;
        }

        public boolean onNext(StreamElement ingest, LogObserver.OnNextContext context) {
            if (!ingest.getKey().startsWith("poisoned-pill")) {
                this.numReceivedElements.incrementAndGet();
            }
            context.confirm();
            return true;
        }

        CountDownLatch getCompleted() {
            return this.completed;
        }

        int getNumReceivedElements() {
            return this.numReceivedElements.get();
        }
    }
}

