package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaStreamElement.class */
public class KafkaStreamElement extends StreamElement {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaStreamElement.class);
    private final int partition;
    private final long offset;

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaStreamElement$KafkaStreamElementSerializer.class */
    public static class KafkaStreamElementSerializer implements ElementSerializer<String, byte[]> {
        @Override // cz.o2.proxima.direct.kafka.ElementSerializer
        @Nullable
        public StreamElement read(ConsumerRecord<String, byte[]> consumerRecord, EntityDescriptor entityDescriptor) {
            String key = consumerRecord.key();
            byte[] value = consumerRecord.value();
            int lastIndexOf = key.lastIndexOf(35);
            if (lastIndexOf < 0 || lastIndexOf >= key.length()) {
                KafkaStreamElement.log.error("Invalid key in kafka topic: {}", key);
                return null;
            }
            String substring = key.substring(0, lastIndexOf);
            String substring2 = key.substring(lastIndexOf + 1);
            Optional findAttribute = entityDescriptor.findAttribute(substring2, true);
            if (findAttribute.isPresent()) {
                return new KafkaStreamElement(entityDescriptor, (AttributeDescriptor) findAttribute.get(), String.valueOf(consumerRecord.topic() + "#" + consumerRecord.partition() + "#" + consumerRecord.offset()), substring, substring2, consumerRecord.timestamp(), value, consumerRecord.partition(), consumerRecord.offset());
            }
            KafkaStreamElement.log.error("Invalid attribute {} in kafka key {}", substring2, key);
            return null;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementSerializer
        public Pair<String, byte[]> write(StreamElement streamElement) {
            return Pair.of(streamElement.getKey() + "#" + streamElement.getAttribute(), streamElement.getValue());
        }

        @Override // cz.o2.proxima.direct.kafka.ElementSerializer
        public Serde<String> keySerde() {
            return Serdes.String();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementSerializer
        public Serde<byte[]> valueSerde() {
            return Serdes.ByteArray();
        }
    }

    KafkaStreamElement(EntityDescriptor entityDescriptor, AttributeDescriptor<?> attributeDescriptor, String str, String str2, String str3, long j, byte[] bArr, int i, long j2) {
        super(entityDescriptor, attributeDescriptor, str, str2, str3, j, false, bArr);
        this.partition = i;
        this.offset = j2;
    }

    public String toString() {
        return "KafkaStreamElement(entityDesc=" + getEntityDescriptor() + ", attributeDesc=" + getAttributeDescriptor() + ", uuid=" + getUuid() + ", key=" + getKey() + ", attribute=" + getAttribute() + ", stamp=" + getStamp() + ", value.length=" + (getValue() == null ? 0 : getValue().length) + ", partition=" + this.partition + ", offset=" + this.offset + ")";
    }

    public int getPartition() {
        return this.partition;
    }

    public long getOffset() {
        return this.offset;
    }
}
