package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.header.Header;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.header.internals.RecordHeader;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.utils.ByteUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/io/kafka/KafkaStreamElement.class */
public class KafkaStreamElement extends StreamElement {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamElement.class);
    private static final long serialVersionUID = 1;
    private final int partition;
    private final long offset;

    /* loaded from: input_file:cz/o2/proxima/direct/io/kafka/KafkaStreamElement$KafkaStreamElementSerializer.class */
    public static class KafkaStreamElementSerializer implements ElementSerializer<String, byte[]> {
        private static final long serialVersionUID = 1;

        @Override // cz.o2.proxima.direct.io.kafka.ElementSerializer
        @Nullable
        public StreamElement read(ConsumerRecord<String, byte[]> consumerRecord, EntityDescriptor entityDescriptor) {
            String key = consumerRecord.key();
            byte[] value = consumerRecord.value();
            int lastIndexOf = key.lastIndexOf(35);
            if (lastIndexOf < 0 || lastIndexOf >= key.length()) {
                KafkaStreamElement.log.error("Invalid key in kafka topic: {}", key);
                return null;
            }
            String substring = key.substring(0, lastIndexOf);
            String substring2 = key.substring(lastIndexOf + 1);
            Optional findAttribute = entityDescriptor.findAttribute(substring2, true);
            if (!findAttribute.isPresent()) {
                KafkaStreamElement.log.error("Invalid attribute {} in kafka key {} for entity {}", new Object[]{substring2, key, entityDescriptor});
                return null;
            }
            Header lastHeader = consumerRecord.headers().lastHeader(KafkaAccessor.SEQUENCE_ID_HEADER);
            String str = (String) Optional.ofNullable(consumerRecord.headers().lastHeader(KafkaAccessor.UUID_HEADER)).map(header -> {
                return new String(header.value(), StandardCharsets.UTF_8);
            }).filter(str2 -> {
                return !str2.isEmpty();
            }).orElse(consumerRecord.topic() + "#" + consumerRecord.partition() + "#" + consumerRecord.offset());
            if (lastHeader != null) {
                try {
                    return new KafkaStreamElement(entityDescriptor, (AttributeDescriptor<?>) findAttribute.get(), asLong(lastHeader.value()), substring, substring2, consumerRecord.timestamp(), value, consumerRecord.partition(), consumerRecord.offset());
                } catch (IOException e) {
                    KafkaStreamElement.log.warn("Failed to deserialize sequenceId from {}", lastHeader, e);
                }
            }
            return new KafkaStreamElement(entityDescriptor, (AttributeDescriptor<?>) findAttribute.get(), str, substring, substring2, consumerRecord.timestamp(), value, consumerRecord.partition(), consumerRecord.offset());
        }

        @Override // cz.o2.proxima.direct.io.kafka.ElementSerializer
        public ProducerRecord<String, byte[]> write(String str, int i, StreamElement streamElement) {
            ArrayList arrayList = new ArrayList();
            if (streamElement.hasSequentialId()) {
                arrayList.add(new RecordHeader(KafkaAccessor.SEQUENCE_ID_HEADER, asBytes(streamElement.getSequentialId())));
            } else {
                arrayList.add(new RecordHeader(KafkaAccessor.UUID_HEADER, streamElement.getUuid().getBytes(StandardCharsets.UTF_8)));
            }
            return new ProducerRecord<>(str, i >= 0 ? Integer.valueOf(i) : null, Long.valueOf(streamElement.getStamp()), streamElement.getKey() + "#" + streamElement.getAttribute(), streamElement.getValue(), arrayList);
        }

        @Override // cz.o2.proxima.direct.io.kafka.ElementSerializer
        public Serde<String> keySerde() {
            return Serdes.String();
        }

        @Override // cz.o2.proxima.direct.io.kafka.ElementSerializer
        public Serde<byte[]> valueSerde() {
            return Serdes.ByteArray();
        }

        @Override // cz.o2.proxima.direct.io.kafka.ElementSerializer
        public boolean storesSequentialId() {
            return true;
        }

        byte[] asBytes(long j) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                try {
                    ByteUtils.writeVarlong(j, dataOutputStream);
                    dataOutputStream.close();
                    return byteArrayOutputStream.toByteArray();
                } finally {
                }
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }

        long asLong(byte[] bArr) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            try {
                DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
                try {
                    long readVarlong = ByteUtils.readVarlong(dataInputStream);
                    dataInputStream.close();
                    byteArrayInputStream.close();
                    return readVarlong;
                } finally {
                }
            } catch (Throwable th) {
                try {
                    byteArrayInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    KafkaStreamElement(EntityDescriptor entityDescriptor, AttributeDescriptor<?> attributeDescriptor, String str, String str2, String str3, long j, byte[] bArr, int i, long j2) {
        super(entityDescriptor, attributeDescriptor, str, str2, str3, j, false, bArr);
        this.partition = i;
        this.offset = j2;
    }

    KafkaStreamElement(EntityDescriptor entityDescriptor, AttributeDescriptor<?> attributeDescriptor, long j, String str, String str2, long j2, byte[] bArr, int i, long j3) {
        super(entityDescriptor, attributeDescriptor, j, str, str2, j2, false, bArr);
        this.partition = i;
        this.offset = j3;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("entityDesc", getEntityDescriptor()).add("attributeDesc", getAttributeDescriptor()).add(KafkaAccessor.UUID_HEADER, getUuid()).add("key", getKey()).add("attribute", getAttribute()).add("stamp", getStamp()).add("value.length", getValue() == null ? -1 : getValue().length).add("partition", this.partition).add("offset", this.offset).toString();
    }

    @Generated
    public int getPartition() {
        return this.partition;
    }

    @Generated
    public long getOffset() {
        return this.offset;
    }
}
