/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.kafka.KafkaAccessor;
import cz.o2.proxima.direct.kafka.KafkaLogReader;
import cz.o2.proxima.direct.kafka.KafkaWriter;
import cz.o2.proxima.direct.kafka.Utils;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableMap;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.KafkaFuture;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.AccessType;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.TestUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class KafkaAccessorTest
implements Serializable {
    private final Repository repo = Repository.ofTest((Config)ConfigFactory.load((String)"test-reference.conf").resolve(), (Repository.Validate[])new Repository.Validate[0]);
    private final DirectDataOperator direct = (DirectDataOperator)this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    private AdminClient adminClient;
    private AttributeFamilyDescriptor attrFmlDesc;
    private AccessType accessType;
    private List<cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config> cfgs;
    private KafkaAccessor kafkaAccessor;
    private List<ConfigEntry> cfgEtrs;

    private void setupMocks() throws ExecutionException, InterruptedException {
        this.adminClient = (AdminClient)Mockito.mock(AdminClient.class);
        KafkaFuture kafkaFuture = (KafkaFuture)Mockito.mock(KafkaFuture.class);
        DescribeConfigsResult cfgResult = (DescribeConfigsResult)Mockito.mock(DescribeConfigsResult.class);
        Map cfgMap = (Map)Mockito.mock(HashMap.class);
        this.attrFmlDesc = (AttributeFamilyDescriptor)Mockito.mock(AttributeFamilyDescriptor.class);
        this.accessType = (AccessType)Mockito.mock(AccessType.class);
        this.cfgs = new ArrayList<cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config>();
        this.cfgEtrs = new ArrayList<ConfigEntry>();
        Mockito.when((Object)this.adminClient.describeConfigs(ArgumentMatchers.anyCollection())).thenReturn((Object)cfgResult);
        Mockito.when((Object)cfgResult.all()).thenReturn((Object)kafkaFuture);
        Mockito.when((Object)((Map)kafkaFuture.get())).thenReturn((Object)cfgMap);
        Mockito.when(cfgMap.values()).thenReturn(this.cfgs);
        Mockito.when((Object)this.attrFmlDesc.getAccess()).thenReturn((Object)this.accessType);
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap()){

            AdminClient createAdmin() {
                return KafkaAccessorTest.this.adminClient;
            }
        };
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testIsStateCommitLogCleanupCompactAndDeleteMultipleCfgs() {
        ExceptionUtils.unchecked(this::setupMocks);
        Mockito.when((Object)this.accessType.isStateCommitLog()).thenReturn((Object)true);
        this.cfgs.add(new cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config(new ArrayList()));
        Assert.assertFalse((boolean)this.kafkaAccessor.isAcceptable(this.attrFmlDesc));
        this.cfgs.clear();
        this.cfgEtrs.add(new ConfigEntry("cleanup.policy", "delete,compact"));
        this.cfgEtrs.add(new ConfigEntry("delete.retention.ms", "300000"));
        this.cfgEtrs.add(new ConfigEntry("file.delete.delay.ms", "300000"));
        this.cfgs.add(new cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config(this.cfgEtrs));
        Assert.assertTrue((boolean)this.kafkaAccessor.isAcceptable(this.attrFmlDesc));
    }

    @Test
    public void testIsAcceptableStateCommitLog() {
        ExceptionUtils.unchecked(this::setupMocks);
        Mockito.when((Object)this.accessType.isStateCommitLog()).thenReturn((Object)true);
        Assert.assertTrue((boolean)this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "compact")));
        Assert.assertTrue((boolean)this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "delete,compact")));
        Assert.assertFalse((boolean)this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("random_config", "random_value")));
        Assert.assertFalse((boolean)this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "delete")));
    }

    @Test
    public void testIsAcceptableFailsForRegexp() {
        EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").build();
        URI storageUri = URI.create("kafka://broker/?topicPattern=pattern");
        this.kafkaAccessor = new KafkaAccessor(entity, storageUri, new HashMap());
        AttributeFamilyDescriptor descriptor = AttributeFamilyDescriptor.newBuilder().setName("test-state-commit-log").setAccess(AccessType.from((Object)"state-commit-log")).setType(StorageType.PRIMARY).setStorageUri(storageUri).setEntity(entity).build();
        Assert.assertThrows(IllegalStateException.class, () -> this.kafkaAccessor.isAcceptable(descriptor));
    }

    @Test
    public void testCreatePropsWithDefaultValues() {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        Properties props = this.kafkaAccessor.createProps();
        Assert.assertEquals((Object)"dummy", (Object)props.getProperty("bootstrap.servers"));
    }

    @Test
    public void testCreatePropsWithSpecifiedOptions() {
        ImmutableMap options = ImmutableMap.builder().put((Object)"kafka.acks", (Object)8).put((Object)"kafka.batch.size", (Object)333).build();
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), (Map)options);
        Properties props = this.kafkaAccessor.createProps();
        Assert.assertEquals((Object)"dummy", (Object)props.getProperty("bootstrap.servers"));
        Assert.assertEquals((Object)"8", (Object)props.getProperty("acks"));
        Assert.assertEquals((Object)"333", (Object)props.get("batch.size"));
    }

    @Test
    public void testReaderAsFactorySerializable() throws IOException, ClassNotFoundException {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        KafkaLogReader reader = this.kafkaAccessor.newReader(this.direct.getContext());
        byte[] bytes = TestUtils.serializeObject((Object)reader.asFactory());
        CommitLogReader.Factory factory = (CommitLogReader.Factory)TestUtils.deserializeObject((byte[])bytes);
        Assert.assertEquals((Object)reader.getUri(), (Object)((KafkaLogReader)factory.apply((Object)this.repo)).getUri());
    }

    @Test
    public void testWriterAsFactorySerializable() throws IOException, ClassNotFoundException {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        KafkaWriter writer = this.kafkaAccessor.newWriter();
        byte[] bytes = TestUtils.serializeObject((Object)writer.asFactory());
        AttributeWriterBase.Factory factory = (AttributeWriterBase.Factory)TestUtils.deserializeObject((byte[])bytes);
        Assert.assertEquals((Object)writer.getUri(), (Object)((KafkaWriter)factory.apply((Object)this.repo)).getUri());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testInvalidURIPatternParsing() {
        Utils.topicPattern((URI)URI.create("kafka://broker/?topicPattern=("));
    }

    @Test
    public void testValidURIPatternParsing() {
        Assert.assertEquals((Object)"pattern", (Object)Utils.topicPattern((URI)URI.create("kafka://broker/?topicPattern=pattern")));
    }
}

