/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Partitioner;
import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.io.kafka.ElementSerializer;
import cz.o2.proxima.direct.io.kafka.KafkaAccessor;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import java.io.Serializable;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaWriter<K, V>
extends AbstractOnlineAttributeWriter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaWriter.class);
    final KafkaAccessor accessor;
    private final Partitioner partitioner;
    private final String topic;
    private final ElementSerializer<K, V> serializer;
    @Nullable
    private transient KafkaProducer<K, V> producer;

    KafkaWriter(KafkaAccessor accessor) {
        super(accessor.getEntityDescriptor(), accessor.getUri());
        this.accessor = accessor;
        this.partitioner = accessor.getPartitioner();
        this.topic = accessor.getTopic();
        this.serializer = accessor.getSerializer();
    }

    public void write(StreamElement data, CommitCallback callback) {
        try {
            if (this.producer == null) {
                this.producer = this.createProducer();
            }
            int partition = (this.partitioner.getPartitionId(data) & Integer.MAX_VALUE) % this.producer.partitionsFor(this.topic).size();
            ProducerRecord<K, V> toWrite = this.serializer.write(this.topic, partition, data);
            this.producer.send(toWrite, (metadata, exception) -> {
                if (metadata != null) {
                    log.debug("Written {} to topic {} offset {} and partition {}", new Object[]{data, metadata.topic(), metadata.offset(), metadata.partition()});
                }
                callback.commit(exception == null, (Throwable)exception);
            });
        }
        catch (Exception ex) {
            log.warn("Failed to write ingest {}", (Object)data, (Object)ex);
            callback.commit(false, (Throwable)ex);
        }
    }

    public OnlineAttributeWriter.Factory<?> asFactory() {
        KafkaAccessor accessor = this.accessor;
        return (OnlineAttributeWriter.Factory & Serializable)repo -> new KafkaWriter(accessor);
    }

    private KafkaProducer<K, V> createProducer() {
        return new KafkaProducer<K, V>(this.accessor.createProps(), this.serializer.keySerde().serializer(), this.serializer.valueSerde().serializer());
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }

    @Generated
    public KafkaAccessor getAccessor() {
        return this.accessor;
    }
}

