package cz.o2.proxima.direct.kafka;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigResolveOptions;
import com.typesafe.config.ConfigValueFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.KafkaStreamElement;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaLogReaderIT.class */
public class KafkaLogReaderIT {
    private static final long AWAIT_TIMEOUT_MS = 5000;
    private static final String CONFIG_FORMAT = "entities {\n  entity {\n    attributes {\n      foo: { scheme: bytes }\n    }\n  }\n}\nattributeFamilies {\n  scalar-primary {\n    entity: entity\n    attributes: [\"foo\"]\n    storage: \"%s\"\n    type: primary\n    access: %s\n    watermark {\n      idle-policy-factory: cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy.Factory\n    }\n    assignment-timeout-ms: 1000\n  }\n}\n";

    @Rule
    public final EmbeddedKafkaRule rule = new EmbeddedKafkaRule(1);
    private DirectDataOperator operator;
    private EntityDescriptor entity;
    private AttributeDescriptor<byte[]> fooDescriptor;

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaLogReaderIT$TestLogObserver.class */
    private static class TestLogObserver implements LogObserver {
        private final AtomicInteger numReceivedElements = new AtomicInteger();
        private final CountDownLatch completed = new CountDownLatch(1);

        private TestLogObserver() {
        }

        public void onCompleted() {
            this.completed.countDown();
        }

        public boolean onError(Throwable th) {
            return false;
        }

        public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
            if (!streamElement.getKey().startsWith("poisoned-pill")) {
                this.numReceivedElements.incrementAndGet();
            }
            onNextContext.confirm();
            return true;
        }

        CountDownLatch getCompleted() {
            return this.completed;
        }

        int getNumReceivedElements() {
            return this.numReceivedElements.get();
        }
    }

    private static void await(CountDownLatch countDownLatch) throws InterruptedException {
        Assert.assertTrue(countDownLatch.await(AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
    }

    @Before
    public void setup() {
        initializeTestWithUri("kafka://\"${broker}\"/foo", "commit-log");
    }

    private void initializeTestWithUri(String str, String str2) {
        Repository ofTest = Repository.ofTest(createConfig(str, str2), new Repository.Validate[0]);
        this.entity = ofTest.getEntity("entity");
        this.fooDescriptor = this.entity.getAttribute("foo");
        this.operator = ofTest.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    }

    @Test(timeout = 30000)
    @Ignore("https://github.com/O2-Czech-Republic/proxima-platform/issues/183")
    public void testReadFromCurrent() throws InterruptedException {
        this.rule.getEmbeddedKafka().addTopics(new NewTopic[]{new NewTopic("foo", 3, (short) 1)});
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        TestLogObserver testLogObserver = new TestLogObserver();
        ObserveHandle observe = commitLogReader.observe("test-reader", Position.CURRENT, testLogObserver);
        observe.waitUntilReady();
        writeElements(100);
        writePoisonedPills(3);
        await(testLogObserver.getCompleted());
        observe.close();
        Assert.assertEquals(100L, testLogObserver.getNumReceivedElements());
        Assert.assertEquals(103L, numCommittedElements(observe));
    }

    @Test(timeout = 30000)
    public void testReadFromOldest() throws InterruptedException {
        this.rule.getEmbeddedKafka().addTopics(new NewTopic[]{new NewTopic("foo", 3, (short) 1)});
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        await(writeElements(100));
        await(writePoisonedPills(3));
        TestLogObserver testLogObserver = new TestLogObserver();
        ObserveHandle observe = commitLogReader.observe("test-reader", Position.OLDEST, testLogObserver);
        await(testLogObserver.getCompleted());
        Assert.assertEquals(100L, testLogObserver.getNumReceivedElements());
        Assert.assertEquals(103L, numCommittedElements(observe));
        observe.close();
        await(writePoisonedPills(3));
        TestLogObserver testLogObserver2 = new TestLogObserver();
        ObserveHandle observe2 = commitLogReader.observe("test-reader", Position.OLDEST, testLogObserver2);
        await(testLogObserver2.getCompleted());
        Assert.assertEquals(0L, testLogObserver2.getNumReceivedElements());
        Assert.assertEquals(106L, numCommittedElements(observe2));
    }

    @Test(timeout = 30000)
    public void testReadFromRegexTopics() throws InterruptedException {
        Assert.assertTrue(Pattern.compile("(foo|bar)").matcher("foo").find());
        initializeTestWithUri("kafka://\"${broker}\"/?topicPattern=(foo%7Cbar)", "[commit-log, read-only]");
        this.rule.getEmbeddedKafka().addTopics(new NewTopic[]{new NewTopic("foo", 3, (short) 1), new NewTopic("bar", 3, (short) 1)});
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(this.operator.getCommitLogReader(new AttributeDescriptor[]{this.fooDescriptor}));
        TestLogObserver testLogObserver = new TestLogObserver();
        ObserveHandle observe = commitLogReader.observe("test-reader", Position.NEWEST, testLogObserver);
        observe.waitUntilReady();
        writeUsingPublisher(100, Lists.newArrayList(new String[]{"foo", "bar"}));
        while (testLogObserver.getNumReceivedElements() < 100) {
            TimeUnit.MILLISECONDS.sleep(100L);
        }
        observe.close();
        Assert.assertEquals(100L, testLogObserver.getNumReceivedElements());
        Assert.assertEquals(100L, numCommittedElements(observe));
    }

    private long numCommittedElements(ObserveHandle observeHandle) {
        return observeHandle.getCommittedOffsets().stream().mapToLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }).sum();
    }

    private CountDownLatch writeElements(int i) {
        OnlineAttributeWriter onlineAttributeWriter = (OnlineAttributeWriter) Optionals.get(this.operator.getWriter(this.fooDescriptor));
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            onlineAttributeWriter.write(StreamElement.upsert(this.entity, this.fooDescriptor, UUID.randomUUID().toString(), String.format("element-%d", Integer.valueOf(i2)), this.fooDescriptor.getName(), i2, "value".getBytes(StandardCharsets.UTF_8)), (z, th) -> {
                if (z) {
                    countDownLatch.countDown();
                }
            });
        }
        return countDownLatch;
    }

    private CountDownLatch writeUsingPublisher(int i, List<String> list) {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        KafkaStreamElement.KafkaStreamElementSerializer kafkaStreamElementSerializer = new KafkaStreamElement.KafkaStreamElementSerializer();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getConnectString(this.rule.getEmbeddedKafka()));
        KafkaProducer kafkaProducer = new KafkaProducer(properties, kafkaStreamElementSerializer.keySerde().serializer(), kafkaStreamElementSerializer.valueSerde().serializer());
        Random random = new Random();
        for (int i2 = 0; i2 < i; i2++) {
            Pair write = kafkaStreamElementSerializer.write(StreamElement.upsert(this.entity, this.fooDescriptor, UUID.randomUUID().toString(), String.format("element-%d", Integer.valueOf(i2)), this.fooDescriptor.getName(), i2, "value".getBytes(StandardCharsets.UTF_8)));
            kafkaProducer.send(new ProducerRecord(list.get(random.nextInt(list.size())), (String) write.getFirst(), (byte[]) write.getSecond()), (recordMetadata, exc) -> {
                Assert.assertNull(exc);
                countDownLatch.countDown();
            });
        }
        return countDownLatch;
    }

    private CountDownLatch writePoisonedPills(int i) {
        OnlineAttributeWriter onlineAttributeWriter = (OnlineAttributeWriter) Optionals.get(this.operator.getWriter(this.fooDescriptor));
        KeyPartitioner keyPartitioner = new KeyPartitioner();
        HashSet hashSet = new HashSet();
        CountDownLatch countDownLatch = new CountDownLatch(i);
        int i2 = 0;
        while (hashSet.size() < i) {
            StreamElement upsert = StreamElement.upsert(this.entity, this.fooDescriptor, UUID.randomUUID().toString(), String.format("poisoned-pill-%d", Integer.valueOf(i2)), this.fooDescriptor.getName(), Long.MAX_VALUE, "value".getBytes(StandardCharsets.UTF_8));
            if (hashSet.add(Integer.valueOf((keyPartitioner.getPartitionId(upsert) & Integer.MAX_VALUE) % i))) {
                onlineAttributeWriter.write(upsert, (z, th) -> {
                    if (z) {
                        countDownLatch.countDown();
                    }
                });
            }
            i2++;
        }
        return countDownLatch;
    }

    private Config createConfig(String str, String str2) {
        return ConfigFactory.parseString(String.format(CONFIG_FORMAT, str, str2)).resolveWith(ConfigFactory.empty().withValue("broker", ConfigValueFactory.fromAnyRef(getConnectString(this.rule.getEmbeddedKafka()))), ConfigResolveOptions.noSystem());
    }

    private String getConnectString(EmbeddedKafkaBroker embeddedKafkaBroker) {
        return (String) Arrays.stream(embeddedKafkaBroker.getBrokerAddresses()).map(brokerAddress -> {
            return brokerAddress.getHost() + ":" + brokerAddress.getPort();
        }).collect(Collectors.joining(","));
    }
}
