package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.kafka.KafkaStreamElement;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.direct.view.LocalCachedPartitionedView;
import cz.o2.proxima.internal.shaded.com.google.common.base.Strings;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.util.Classpath;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaAccessor.class */
public class KafkaAccessor extends AbstractStorage implements DataAccessor {
    public static final String POLL_INTERVAL_CFG = "poll.interval";
    public static final String PARTITIONER_CLASS = "partitioner";
    public static final String SERIALIZER_CLASS = "serializer-class";
    public static final String MAX_BYTES_PER_SEC = "bytes-per-sec-max";
    public static final String TIMESTAMP_SKEW = "timestamp-skew";
    public static final String EMPTY_POLLS = "poll.count-for-empty";
    public static final String MAX_POLL_RECORDS = "kafka.max.poll.records";
    public static final String AUTO_COMMIT_INTERVAL_MS = "commit.auto-interval-ms";
    public static final String LOG_STALE_COMMIT_INTERVAL_MS = "commit.log-stale-interval-ms";
    public static final String EMPTY_POLL_TIME = "poll.allowed-empty-before-watermark-move";
    private final String topic;
    private final Map<String, Object> cfg;
    private Partitioner partitioner;
    private long consumerPollInterval;
    private long maxBytesPerSec;
    private long timestampSkew;
    private int emptyPolls;
    private int maxPollRecords;
    private long autoCommitIntervalNs;
    private long logStaleCommitIntervalNs;
    Class<ElementSerializer<?, ?>> serializerClass;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaAccessor.class);
    public static final String WRITER_CONFIG_PREFIX = "kafka.";
    private static final int PRODUCE_CONFIG_PREFIX_LENGTH = WRITER_CONFIG_PREFIX.length();

    public KafkaAccessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map) {
        super(entityDescriptor, uri);
        this.partitioner = new KeyPartitioner();
        this.consumerPollInterval = 100L;
        this.maxBytesPerSec = Long.MAX_VALUE;
        this.timestampSkew = 100L;
        this.emptyPolls = (int) (1000 / this.consumerPollInterval);
        this.maxPollRecords = 500;
        this.autoCommitIntervalNs = Long.MAX_VALUE;
        this.logStaleCommitIntervalNs = Long.MAX_VALUE;
        if (uri.getPath().length() <= 1) {
            throw new IllegalArgumentException("Specify topic by path in URI");
        }
        if (Strings.isNullOrEmpty(uri.getAuthority())) {
            throw new IllegalArgumentException("Specify brokers by authority in URI");
        }
        this.cfg = map;
        this.topic = Utils.topic(uri);
        configure(map);
    }

    private void configure(Map<String, Object> map) {
        this.consumerPollInterval = ((Long) Optional.ofNullable(map.get(POLL_INTERVAL_CFG)).map(obj -> {
            return Long.valueOf(obj.toString());
        }).orElse(Long.valueOf(this.consumerPollInterval))).longValue();
        this.partitioner = (Partitioner) Optional.ofNullable((String) map.get(PARTITIONER_CLASS)).map(str -> {
            return (Partitioner) Classpath.newInstance(str, Partitioner.class);
        }).orElse(this.partitioner);
        this.maxBytesPerSec = ((Long) Optional.ofNullable(map.get(MAX_BYTES_PER_SEC)).map(obj2 -> {
            return Long.valueOf(obj2.toString());
        }).orElse(Long.valueOf(this.maxBytesPerSec))).longValue();
        this.timestampSkew = ((Long) Optional.ofNullable(map.get(TIMESTAMP_SKEW)).map(obj3 -> {
            return Long.valueOf(obj3.toString());
        }).orElse(Long.valueOf(this.timestampSkew))).longValue();
        this.emptyPolls = ((Integer) Optional.ofNullable(map.get(EMPTY_POLLS)).map(obj4 -> {
            return Integer.valueOf(obj4.toString());
        }).orElse(Integer.valueOf((int) (1000 / this.consumerPollInterval)))).intValue();
        this.maxPollRecords = ((Integer) Optional.ofNullable(map.get(MAX_POLL_RECORDS)).map(obj5 -> {
            return Integer.valueOf(obj5.toString());
        }).orElse(Integer.valueOf(this.maxPollRecords))).intValue();
        this.autoCommitIntervalNs = ((Long) Optional.ofNullable(map.get(AUTO_COMMIT_INTERVAL_MS)).map(obj6 -> {
            return Long.valueOf(Long.valueOf(obj6.toString()).longValue() * 1000000);
        }).orElse(Long.valueOf(this.autoCommitIntervalNs))).longValue();
        this.logStaleCommitIntervalNs = ((Long) Optional.ofNullable(map.get(LOG_STALE_COMMIT_INTERVAL_MS)).map(obj7 -> {
            return Long.valueOf(Long.valueOf(obj7.toString()).longValue() * 1000000);
        }).orElse(Long.valueOf(this.logStaleCommitIntervalNs))).longValue();
        this.serializerClass = (Class) Optional.ofNullable(map.get(SERIALIZER_CLASS)).map((v0) -> {
            return v0.toString();
        }).map(str2 -> {
            return Classpath.findClass(str2, ElementSerializer.class);
        }).orElse(KafkaStreamElement.KafkaStreamElementSerializer.class);
        log.info("Configured accessor with consumerPollInterval {},partitionerClass {}, maxBytesPerSec {}, timestampSkew {}, emptyPolls {}, maxPollRecords {}, autoCommitIntervalNs {}, logStaleCommitIntervalNs {}, serializerClass {},for URI {}", Long.valueOf(this.consumerPollInterval), this.partitioner.getClass(), Long.valueOf(this.maxBytesPerSec), Long.valueOf(this.timestampSkew), Integer.valueOf(this.emptyPolls), Integer.valueOf(this.maxPollRecords), Long.valueOf(this.autoCommitIntervalNs), Long.valueOf(this.logStaleCommitIntervalNs), this.serializerClass, getUri());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Properties createProps() {
        Properties properties = new Properties();
        for (Map.Entry<String, Object> entry : this.cfg.entrySet()) {
            if (entry.getKey().startsWith(WRITER_CONFIG_PREFIX)) {
                properties.put(entry.getKey().substring(PRODUCE_CONFIG_PREFIX_LENGTH), entry.getValue().toString());
            }
        }
        properties.put(MAX_POLL_RECORDS, Integer.valueOf(this.maxPollRecords));
        return properties;
    }

    public <K, V> KafkaConsumerFactory<K, V> createConsumerFactory() {
        ElementSerializer<K, V> serializer = getSerializer();
        return new KafkaConsumerFactory<>(getUri(), createProps(), serializer.keySerde(), serializer.valueSerde());
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(newWriter());
    }

    public Optional<CommitLogReader> getCommitLogReader(Context context) {
        return Optional.of(newReader(context));
    }

    public Optional<CachedView> getCachedView(Context context) {
        return Optional.of(new LocalCachedPartitionedView(getEntityDescriptor(), newReader(context), newWriter()));
    }

    KafkaWriter newWriter() {
        return new KafkaWriter(this);
    }

    KafkaLogReader newReader(Context context) {
        return new KafkaLogReader(this, context);
    }

    public <K, V> ElementSerializer<K, V> getSerializer() {
        ElementSerializer<K, V> elementSerializer = (ElementSerializer) Classpath.newInstance(this.serializerClass);
        elementSerializer.setup(getEntityDescriptor());
        return elementSerializer;
    }

    public String getTopic() {
        return this.topic;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getConsumerPollInterval() {
        return this.consumerPollInterval;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getMaxBytesPerSec() {
        return this.maxBytesPerSec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getTimestampSkew() {
        return this.timestampSkew;
    }

    int getEmptyPolls() {
        return this.emptyPolls;
    }

    int getMaxPollRecords() {
        return this.maxPollRecords;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getAutoCommitIntervalNs() {
        return this.autoCommitIntervalNs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLogStaleCommitIntervalNs() {
        return this.logStaleCommitIntervalNs;
    }
}
