package cz.o2.proxima.storage.kafka;

import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serializer;
import cz.o2.proxima.storage.AbstractOnlineAttributeWriter;
import cz.o2.proxima.storage.CommitCallback;
import cz.o2.proxima.storage.StreamElement;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaWriter.class */
public class KafkaWriter extends AbstractOnlineAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaWriter.class);
    final KafkaAccessor accessor;
    private final Partitioner partitioner;
    private final String topic;
    private KafkaProducer<String, byte[]> producer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaWriter(KafkaAccessor kafkaAccessor) {
        super(kafkaAccessor.getEntityDescriptor(), kafkaAccessor.getURI());
        this.accessor = kafkaAccessor;
        this.partitioner = kafkaAccessor.getPartitioner();
        this.topic = kafkaAccessor.getTopic();
    }

    public void write(StreamElement streamElement, CommitCallback commitCallback) {
        try {
            if (this.producer == null) {
                this.producer = createProducer();
            }
            this.producer.send(new ProducerRecord<>(this.topic, Integer.valueOf((this.partitioner.getPartitionId(streamElement.getKey(), streamElement.getAttribute(), streamElement.getValue()) & Integer.MAX_VALUE) % this.producer.partitionsFor(this.topic).size()), Long.valueOf(streamElement.getStamp()), streamElement.getKey() + "#" + streamElement.getAttribute(), streamElement.getValue()), (recordMetadata, exc) -> {
                commitCallback.commit(exc == null, exc);
            });
        } catch (Exception e) {
            log.warn("Failed to write ingest {}", streamElement, e);
            commitCallback.commit(false, e);
        }
    }

    private KafkaProducer<String, byte[]> createProducer() {
        Properties createProps = this.accessor.createProps();
        createProps.put(ProducerConfig.ACKS_CONFIG, "all");
        createProps.put("bootstrap.servers", getURI().getAuthority());
        createProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        return new KafkaProducer<>(createProps, (Serializer) Serdes.String().serializer(), (Serializer) Serdes.ByteArray().serializer());
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }

    public KafkaAccessor getAccessor() {
        return this.accessor;
    }
}
