package cz.o2.proxima.direct.kafka;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableMap;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.KafkaFuture;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.AccessType;
import cz.o2.proxima.storage.StorageType;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.TestUtils;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaAccessorTest.class */
public class KafkaAccessorTest implements Serializable {
    private final Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve(), new Repository.Validate[0]);
    private final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    private AdminClient adminClient;
    private AttributeFamilyDescriptor attrFmlDesc;
    private AccessType accessType;
    private List<Config> cfgs;
    private KafkaAccessor kafkaAccessor;
    private List<ConfigEntry> cfgEtrs;

    private void setupMocks() throws ExecutionException, InterruptedException {
        this.adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        DescribeConfigsResult describeConfigsResult = (DescribeConfigsResult) Mockito.mock(DescribeConfigsResult.class);
        Map map = (Map) Mockito.mock(HashMap.class);
        this.attrFmlDesc = (AttributeFamilyDescriptor) Mockito.mock(AttributeFamilyDescriptor.class);
        this.accessType = (AccessType) Mockito.mock(AccessType.class);
        this.cfgs = new ArrayList();
        this.cfgEtrs = new ArrayList();
        Mockito.when(this.adminClient.describeConfigs(ArgumentMatchers.anyCollection())).thenReturn(describeConfigsResult);
        Mockito.when(describeConfigsResult.all()).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.get()).thenReturn(map);
        Mockito.when(map.values()).thenReturn(this.cfgs);
        Mockito.when(this.attrFmlDesc.getAccess()).thenReturn(this.accessType);
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap()) { // from class: cz.o2.proxima.direct.kafka.KafkaAccessorTest.1
            AdminClient createAdmin() {
                return KafkaAccessorTest.this.adminClient;
            }
        };
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testIsStateCommitLogCleanupCompactAndDeleteMultipleCfgs() {
        ExceptionUtils.unchecked(this::setupMocks);
        Mockito.when(Boolean.valueOf(this.accessType.isStateCommitLog())).thenReturn(true);
        this.cfgs.add(new Config(new ArrayList()));
        Assert.assertFalse(this.kafkaAccessor.isAcceptable(this.attrFmlDesc));
        this.cfgs.clear();
        this.cfgEtrs.add(new ConfigEntry("cleanup.policy", "delete,compact"));
        this.cfgEtrs.add(new ConfigEntry("delete.retention.ms", "300000"));
        this.cfgEtrs.add(new ConfigEntry("file.delete.delay.ms", "300000"));
        this.cfgs.add(new Config(this.cfgEtrs));
        Assert.assertTrue(this.kafkaAccessor.isAcceptable(this.attrFmlDesc));
    }

    @Test
    public void testIsAcceptableStateCommitLog() {
        ExceptionUtils.unchecked(this::setupMocks);
        Mockito.when(Boolean.valueOf(this.accessType.isStateCommitLog())).thenReturn(true);
        Assert.assertTrue(this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "compact")));
        Assert.assertTrue(this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "delete,compact")));
        Assert.assertFalse(this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("random_config", "random_value")));
        Assert.assertFalse(this.kafkaAccessor.verifyCleanupPolicy(new ConfigEntry("cleanup.policy", "delete")));
    }

    @Test
    public void testIsAcceptableFailsForRegexp() {
        EntityDescriptor build = EntityDescriptor.newBuilder().setName("entity").build();
        URI create = URI.create("kafka://broker/?topicPattern=pattern");
        this.kafkaAccessor = new KafkaAccessor(build, create, new HashMap());
        AttributeFamilyDescriptor build2 = AttributeFamilyDescriptor.newBuilder().setName("test-state-commit-log").setAccess(AccessType.from("state-commit-log")).setType(StorageType.PRIMARY).setStorageUri(create).setEntity(build).build();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.kafkaAccessor.isAcceptable(build2);
        });
    }

    @Test
    public void testCreatePropsWithDefaultValues() {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        Assert.assertEquals("dummy", this.kafkaAccessor.createProps().getProperty("bootstrap.servers"));
    }

    @Test
    public void testCreatePropsWithSpecifiedOptions() {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), ImmutableMap.builder().put("kafka.acks", 8).put("kafka.batch.size", 333).build());
        Properties createProps = this.kafkaAccessor.createProps();
        Assert.assertEquals("dummy", createProps.getProperty("bootstrap.servers"));
        Assert.assertEquals("8", createProps.getProperty("acks"));
        Assert.assertEquals("333", createProps.get("batch.size"));
    }

    @Test
    public void testReaderAsFactorySerializable() throws IOException, ClassNotFoundException {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        KafkaLogReader newReader = this.kafkaAccessor.newReader(this.direct.getContext());
        Assert.assertEquals(newReader.getUri(), ((KafkaLogReader) ((CommitLogReader.Factory) TestUtils.deserializeObject(TestUtils.serializeObject(newReader.asFactory()))).apply(this.repo)).getUri());
    }

    @Test
    public void testWriterAsFactorySerializable() throws IOException, ClassNotFoundException {
        this.kafkaAccessor = new KafkaAccessor(EntityDescriptor.newBuilder().setName("entity").build(), URI.create("kafka-test://dummy/topic"), new HashMap());
        KafkaWriter newWriter = this.kafkaAccessor.newWriter();
        Assert.assertEquals(newWriter.getUri(), ((KafkaWriter) ((AttributeWriterBase.Factory) TestUtils.deserializeObject(TestUtils.serializeObject(newWriter.asFactory()))).apply(this.repo)).getUri());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidURIPatternParsing() {
        Utils.topicPattern(URI.create("kafka://broker/?topicPattern=("));
    }

    @Test
    public void testValidURIPatternParsing() {
        Assert.assertEquals("pattern", Utils.topicPattern(URI.create("kafka://broker/?topicPattern=pattern")));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1053741996:
                if (implMethodName.equals("setupMocks")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaAccessorTest") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    KafkaAccessorTest kafkaAccessorTest = (KafkaAccessorTest) serializedLambda.getCapturedArg(0);
                    return kafkaAccessorTest::setupMocks;
                }
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaAccessorTest") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    KafkaAccessorTest kafkaAccessorTest2 = (KafkaAccessorTest) serializedLambda.getCapturedArg(0);
                    return kafkaAccessorTest2::setupMocks;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
