/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.AbstractStorage;
import cz.o2.proxima.core.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.core.storage.commitlog.Partitioner;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.view.CachedView;
import cz.o2.proxima.direct.core.view.LocalCachedPartitionedView;
import cz.o2.proxima.direct.io.kafka.ElementSerializer;
import cz.o2.proxima.direct.io.kafka.KafkaConsumerFactory;
import cz.o2.proxima.direct.io.kafka.KafkaLogReader;
import cz.o2.proxima.direct.io.kafka.KafkaStreamElement;
import cz.o2.proxima.direct.io.kafka.KafkaWatermarkConfiguration;
import cz.o2.proxima.direct.io.kafka.KafkaWriter;
import cz.o2.proxima.direct.io.kafka.Utils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.base.Strings;
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import java.io.Serializable;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAccessor
extends AbstractStorage.SerializableAbstractStorage
implements DataAccessor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaAccessor.class);
    private static final long serialVersionUID = 1L;
    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
    public static final String POLL_INTERVAL_CFG = "poll.interval";
    public static final String PARTITIONER_CLASS = "partitioner";
    public static final String SERIALIZER_CLASS = "serializer-class";
    public static final String MAX_BYTES_PER_SEC = "bytes-per-sec-max";
    public static final String MAX_POLL_RECORDS = "kafka.max.poll.records";
    public static final String AUTO_COMMIT_INTERVAL_MS = "commit.auto-interval-ms";
    public static final String LOG_STALE_COMMIT_INTERVAL_MS = "commit.log-stale-interval-ms";
    public static final String ASSIGNMENT_TIMEOUT_MS = "assignment-timeout-ms";
    public static final String EMPTY_POLL_TIME = "poll.allowed-empty-before-watermark-move";
    public static final String SEQUENCE_ID_HEADER = "seqId";
    public static final String UUID_HEADER = "uuid";
    @Nullable
    private final String topic;
    @Nullable
    private final String topicPattern;
    private final Map<String, Object> cfg;
    private Partitioner partitioner = new KeyPartitioner();
    private long consumerPollInterval = 100L;
    private long maxBytesPerSec = Long.MAX_VALUE;
    private int maxPollRecords = 500;
    private long autoCommitIntervalMs = Long.MAX_VALUE;
    private long logStaleCommitIntervalMs = 60000L;
    private long assignmentTimeoutMillis = 10000L;
    private KafkaWatermarkConfiguration watermarkConfiguration;
    Class<ElementSerializer<?, ?>> serializerClass;

    public KafkaAccessor(EntityDescriptor entity, URI uri, Map<String, Object> cfg) {
        super(entity, uri);
        if (Strings.isNullOrEmpty((String)uri.getAuthority())) {
            throw new IllegalArgumentException("Specify brokers by authority in URI");
        }
        this.cfg = cfg;
        this.topic = Utils.topic(uri);
        this.topicPattern = Utils.topicPattern(uri);
        Preconditions.checkArgument((boolean)(this.topic == null ^ this.topicPattern == null), (String)"Please specify EITHER topic directly as path OR specify topic regex pattern via %s in URI %s", (Object)"topicPattern", (Object)uri);
        this.configure(cfg);
    }

    private void configure(Map<String, Object> cfg) {
        this.consumerPollInterval = Optional.ofNullable(cfg.get(POLL_INTERVAL_CFG)).map(v -> Long.valueOf(v.toString())).orElse(this.consumerPollInterval);
        this.partitioner = Optional.ofNullable((String)cfg.get(PARTITIONER_CLASS)).map(cls -> (Partitioner)Classpath.newInstance((String)cls, Partitioner.class)).orElse(this.partitioner);
        this.maxBytesPerSec = Optional.ofNullable(cfg.get(MAX_BYTES_PER_SEC)).map(v -> Long.valueOf(v.toString())).orElse(this.maxBytesPerSec);
        this.maxPollRecords = Optional.ofNullable(cfg.get(MAX_POLL_RECORDS)).map(v -> Integer.valueOf(v.toString())).orElse(this.maxPollRecords);
        this.autoCommitIntervalMs = Optional.ofNullable(cfg.get(AUTO_COMMIT_INTERVAL_MS)).map(v -> Long.parseLong(v.toString())).orElse(this.autoCommitIntervalMs);
        this.logStaleCommitIntervalMs = Optional.ofNullable(cfg.get(LOG_STALE_COMMIT_INTERVAL_MS)).map(v -> Long.parseLong(v.toString())).orElse(this.logStaleCommitIntervalMs);
        this.assignmentTimeoutMillis = Optional.ofNullable(cfg.get(ASSIGNMENT_TIMEOUT_MS)).map(v -> Long.parseLong(v.toString())).orElse(this.assignmentTimeoutMillis);
        Class<KafkaStreamElement.KafkaStreamElementSerializer> serializer = Optional.ofNullable(cfg.get(SERIALIZER_CLASS)).map(Object::toString).map(c -> Classpath.findClass((String)c, ElementSerializer.class)).orElse(KafkaStreamElement.KafkaStreamElementSerializer.class);
        this.serializerClass = serializer;
        this.watermarkConfiguration = new KafkaWatermarkConfiguration(cfg);
        log.info("Configured accessor with consumerPollInterval {},partitionerClass {}, maxBytesPerSec {}, watermarkConfiguration {}, maxPollRecords {}, autoCommitIntervalNs {}, logStaleCommitIntervalMs {}, serializerClass {},for URI {}", new Object[]{this.consumerPollInterval, this.partitioner.getClass(), this.maxBytesPerSec, this.watermarkConfiguration, this.maxPollRecords, this.autoCommitIntervalMs, this.logStaleCommitIntervalMs, this.serializerClass, this.getUri()});
    }

    Properties createProps() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.getUri().getAuthority());
        for (Map.Entry<String, Object> e : this.cfg.entrySet()) {
            if (!e.getKey().startsWith(KAFKA_CONFIG_PREFIX)) continue;
            props.put(e.getKey().substring(KAFKA_CONFIG_PREFIX.length()), e.getValue().toString());
        }
        return props;
    }

    @VisibleForTesting
    AdminClient createAdmin() {
        return AdminClient.create(this.createProps());
    }

    public <K, V> KafkaConsumerFactory<K, V> createConsumerFactory() {
        ElementSerializer<K, V> serializer = this.getSerializer();
        return new KafkaConsumerFactory<K, V>(this.getUri(), this.createProps(), serializer.keySerde(), serializer.valueSerde());
    }

    public boolean isAcceptable(AttributeFamilyDescriptor familyDescriptor) {
        if (familyDescriptor.getAccess().isStateCommitLog()) {
            Preconditions.checkState((this.topic != null ? 1 : 0) != 0, (Object)"State commit log is not supported on topics specified by regexp.");
            try (AdminClient adminClient = this.createAdmin();){
                DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, this.topic)));
                Config config = (Config)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> (Config)Iterables.getOnlyElement(configsResult.all().get().values()));
                ConfigEntry cleanupPolicy = config.get("cleanup.policy");
                boolean bl = this.verifyCleanupPolicy(cleanupPolicy);
                return bl;
            }
        }
        return true;
    }

    @VisibleForTesting
    public boolean verifyCleanupPolicy(ConfigEntry cleanupPolicy) {
        if (cleanupPolicy != null && cleanupPolicy.value().contains("compact")) {
            return true;
        }
        log.warn("Missing option [cleanup.policy=compact] of kafka topic [{}] with access type [state-commit-log].", (Object)this.topic);
        return false;
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(this.newWriter());
    }

    public Optional<CommitLogReader> getCommitLogReader(Context context) {
        return Optional.of(this.newReader(context));
    }

    public Optional<CachedView> getCachedView(Context context) {
        return Optional.of(new LocalCachedPartitionedView(this.getEntityDescriptor(), (CommitLogReader)this.newReader(context), this.newWriter()));
    }

    KafkaWriter<?, ?> newWriter() {
        return new KafkaWriter(this);
    }

    KafkaLogReader newReader(Context context) {
        return new KafkaLogReader(this, context);
    }

    public <K, V> ElementSerializer<K, V> getSerializer() {
        ElementSerializer res = (ElementSerializer)Classpath.newInstance(this.serializerClass);
        res.setup(this.getEntityDescriptor());
        return res;
    }

    boolean isTopicRegex() {
        return this.topicPattern != null;
    }

    @Nullable
    @Generated
    public String getTopic() {
        return this.topic;
    }

    @Nullable
    @Generated
    public String getTopicPattern() {
        return this.topicPattern;
    }

    @Generated
    public Map<String, Object> getCfg() {
        return this.cfg;
    }

    @Generated
    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    @Generated
    public long getConsumerPollInterval() {
        return this.consumerPollInterval;
    }

    @Generated
    public long getMaxBytesPerSec() {
        return this.maxBytesPerSec;
    }

    @Generated
    public int getMaxPollRecords() {
        return this.maxPollRecords;
    }

    @Generated
    public long getAutoCommitIntervalMs() {
        return this.autoCommitIntervalMs;
    }

    @Generated
    public long getLogStaleCommitIntervalMs() {
        return this.logStaleCommitIntervalMs;
    }

    @Generated
    public long getAssignmentTimeoutMillis() {
        return this.assignmentTimeoutMillis;
    }

    @Generated
    public KafkaWatermarkConfiguration getWatermarkConfiguration() {
        return this.watermarkConfiguration;
    }

    @Generated
    public Class<ElementSerializer<?, ?>> getSerializerClass() {
        return this.serializerClass;
    }
}

