/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.io.kafka.ElementSerializer;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.header.Header;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.header.internals.RecordHeader;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.utils.ByteUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamElement
extends StreamElement {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamElement.class);
    private static final long serialVersionUID = 1L;
    private final int partition;
    private final long offset;

    KafkaStreamElement(EntityDescriptor entityDesc, AttributeDescriptor<?> attributeDesc, String uuid, String key, String attribute, long stamp, byte[] value, int partition, long offset) {
        super(entityDesc, attributeDesc, uuid, key, attribute, stamp, false, value);
        this.partition = partition;
        this.offset = offset;
    }

    KafkaStreamElement(EntityDescriptor entityDesc, AttributeDescriptor<?> attributeDesc, long sequenceId, String key, String attribute, long stamp, byte[] value, int partition, long offset) {
        super(entityDesc, attributeDesc, sequenceId, key, attribute, stamp, false, value);
        this.partition = partition;
        this.offset = offset;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)((Object)this)).add("entityDesc", (Object)this.getEntityDescriptor()).add("attributeDesc", (Object)this.getAttributeDescriptor()).add("uuid", (Object)this.getUuid()).add("key", (Object)this.getKey()).add("attribute", (Object)this.getAttribute()).add("stamp", this.getStamp()).add("value.length", this.getValue() == null ? -1 : this.getValue().length).add("partition", this.partition).add("offset", this.offset).toString();
    }

    @Generated
    public int getPartition() {
        return this.partition;
    }

    @Generated
    public long getOffset() {
        return this.offset;
    }

    public static class KafkaStreamElementSerializer
    implements ElementSerializer<String, byte[]> {
        private static final long serialVersionUID = 1L;

        @Override
        @Nullable
        public StreamElement read(ConsumerRecord<String, byte[]> record, EntityDescriptor entityDesc) {
            String key = record.key();
            byte[] value = record.value();
            int hashPos = key.lastIndexOf(35);
            if (hashPos < 0 || hashPos >= key.length()) {
                log.error("Invalid key in kafka topic: {}", (Object)key);
            } else {
                String entityKey = key.substring(0, hashPos);
                String attribute = key.substring(hashPos + 1);
                Optional attr = entityDesc.findAttribute(attribute, true);
                if (!attr.isPresent()) {
                    log.error("Invalid attribute {} in kafka key {} for entity {}", new Object[]{attribute, key, entityDesc});
                } else {
                    Header sequenceIdHeader = record.headers().lastHeader("seqId");
                    String uuid = Optional.ofNullable(record.headers().lastHeader("uuid")).map(v -> new String(v.value(), StandardCharsets.UTF_8)).filter(s -> !s.isEmpty()).orElse(record.topic() + "#" + record.partition() + "#" + record.offset());
                    if (sequenceIdHeader != null) {
                        try {
                            long seqId = this.asLong(sequenceIdHeader.value());
                            return new KafkaStreamElement(entityDesc, (AttributeDescriptor)attr.get(), seqId, entityKey, attribute, record.timestamp(), value, record.partition(), record.offset());
                        }
                        catch (IOException ex) {
                            log.warn("Failed to deserialize sequenceId from {}", (Object)sequenceIdHeader, (Object)ex);
                        }
                    }
                    return new KafkaStreamElement(entityDesc, (AttributeDescriptor)attr.get(), uuid, entityKey, attribute, record.timestamp(), value, record.partition(), record.offset());
                }
            }
            return null;
        }

        @Override
        public ProducerRecord<String, byte[]> write(String topic, int partition, StreamElement data) {
            ArrayList<Header> headers = new ArrayList<Header>();
            if (data.hasSequentialId()) {
                headers.add(new RecordHeader("seqId", this.asBytes(data.getSequentialId())));
            } else {
                headers.add(new RecordHeader("uuid", data.getUuid().getBytes(StandardCharsets.UTF_8)));
            }
            return new ProducerRecord<CallSite, byte[]>(topic, partition >= 0 ? Integer.valueOf(partition) : null, data.getStamp(), (CallSite)((Object)(data.getKey() + "#" + data.getAttribute())), data.getValue(), (Iterable<Header>)headers);
        }

        @Override
        public Serde<String> keySerde() {
            return Serdes.String();
        }

        @Override
        public Serde<byte[]> valueSerde() {
            return Serdes.ByteArray();
        }

        @Override
        public boolean storesSequentialId() {
            return true;
        }

        byte[] asBytes(long sequentialId) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try (DataOutputStream dout = new DataOutputStream(baos);){
                ByteUtils.writeVarlong(sequentialId, dout);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return baos.toByteArray();
        }

        long asLong(byte[] serializedSeqId) throws IOException {
            try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedSeqId);){
                long l;
                try (DataInputStream dis = new DataInputStream(bais);){
                    l = ByteUtils.readVarlong(dis);
                }
                return l;
            }
        }
    }
}

