package cz.o2.proxima.storage.kafka;

import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.Node;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.record.TimestampType;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.CommitCallback;
import cz.o2.proxima.storage.DataAccessor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StorageDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.Cancellable;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.util.Pair;
import cz.o2.proxima.view.PartitionedLogObserver;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptor.class */
public class LocalKafkaCommitLogDescriptor extends StorageDescriptor {
    public static final String CFG_NUM_PARTITIONS = "local-kafka-num-partitions";
    private final int id;
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptor.class);
    private static final Map<Integer, Map<URI, Accessor>> ACCESSORS = Collections.synchronizedMap(new HashMap());

    /* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptor$Accessor.class */
    public static class Accessor extends KafkaAccessor {
        final int descriptorId;
        final int numPartitions;
        transient Map<String, ConsumerGroup> consumerGroups;
        transient List<List<StreamElement>> written;
        transient Map<Pair<String, Integer>, List<Pair<Integer, AtomicInteger>>> consumerOffsets;
        transient Map<Pair<String, Integer>, AtomicInteger> committedOffsets;

        public Accessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map, int i) {
            super(entityDescriptor, uri, map);
            this.descriptorId = i;
            this.consumerOffsets = Collections.synchronizedMap(new HashMap());
            this.written = Collections.synchronizedList(new ArrayList());
            this.consumerGroups = Collections.synchronizedMap(new HashMap());
            this.committedOffsets = Collections.synchronizedMap(new HashMap());
            this.numPartitions = ((Integer) Optional.ofNullable(map.get(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS)).filter(obj -> {
                return obj != null;
            }).map(obj2 -> {
                return Integer.valueOf(obj2.toString());
            }).orElse(1)).intValue();
            for (int i2 = 0; i2 < this.numPartitions; i2++) {
                this.written.add(Collections.synchronizedList(new ArrayList()));
            }
            LocalKafkaCommitLogDescriptor.log.info("Created accessor with URI {} and {} partitions", uri, Integer.valueOf(this.numPartitions));
        }

        public KafkaConsumerFactory createConsumerFactory() {
            return new KafkaConsumerFactory(getURI(), new Properties()) { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptor.Accessor.1
                public KafkaConsumer<String, byte[]> create() {
                    return create(allPartitions());
                }

                public KafkaConsumer<String, byte[]> create(Collection<Partition> collection) {
                    String str = "unnamed-consumer-" + Math.round(Math.random() * 2.147483647E9d);
                    ConsumerGroup consumerGroup = new ConsumerGroup(str, Accessor.this.getTopic(), Accessor.this.numPartitions);
                    return Accessor.this.mockKafkaConsumer(str, Pair.of(str, Integer.valueOf(consumerGroup.add(collection))), consumerGroup);
                }

                public KafkaConsumer<String, byte[]> create(String str) {
                    return create(str, null);
                }

                public KafkaConsumer<String, byte[]> create(String str, @Nullable ConsumerRebalanceListener consumerRebalanceListener) {
                    KafkaConsumer<String, byte[]> mockKafkaConsumer;
                    synchronized (LocalKafkaCommitLogDescriptor.class) {
                        ConsumerGroup consumerGroup = Accessor.this.consumerGroups.get(str);
                        if (consumerGroup == null) {
                            consumerGroup = new ConsumerGroup(str, Accessor.this.getTopic(), Accessor.this.numPartitions);
                            Accessor.this.consumerGroups.put(str, consumerGroup);
                        }
                        mockKafkaConsumer = Accessor.this.mockKafkaConsumer(str, Pair.of(str, Integer.valueOf(consumerGroup.add(consumerRebalanceListener))), consumerGroup);
                    }
                    return mockKafkaConsumer;
                }

                private List<Partition> allPartitions() {
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < Accessor.this.numPartitions; i++) {
                        int i2 = i;
                        arrayList.add(() -> {
                            return i2;
                        });
                    }
                    return arrayList;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case -152851600:
                            if (implMethodName.equals("lambda$allPartitions$44ec7e33$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptor$Accessor$1") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                                int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                                return () -> {
                                    return intValue;
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected KafkaConsumer<String, byte[]> mockKafkaConsumer(String str, Pair<String, Integer> pair, ConsumerGroup consumerGroup) {
            for (int i = 0; i < this.numPartitions; i++) {
                this.committedOffsets.putIfAbsent(Pair.of(str, Integer.valueOf(i)), new AtomicInteger(this.written.get(i).size() - 1));
            }
            LocalKafkaCommitLogDescriptor.log.info("Creating mock kafka consumer name {}, with committed offsets {}", str, this.committedOffsets);
            KafkaConsumer<String, byte[]> kafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
            this.consumerOffsets.put(pair, consumerGroup.getAssignment(((Integer) pair.getSecond()).intValue()).stream().map(partition -> {
                return Pair.of(Integer.valueOf(partition.getId()), new AtomicInteger(this.committedOffsets.get(Pair.of(str, Integer.valueOf(partition.getId()))).get()));
            }).collect(Collectors.toList()));
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock -> {
                return pollConsumer(consumerGroup, ((Long) invocationOnMock.getArguments()[0]).longValue(), pair);
            }).when(kafkaConsumer)).poll(Matchers.anyLong());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock2 -> {
                return getEndOffsets(str, (Collection) invocationOnMock2.getArguments()[0]);
            }).when(kafkaConsumer)).endOffsets((Collection) Matchers.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock3 -> {
                Collection collection = (Collection) invocationOnMock3.getArguments()[0];
                Map map = (Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
                    return topicPartition;
                }, topicPartition2 -> {
                    return 0L;
                }));
                LocalKafkaCommitLogDescriptor.log.debug("Consumer {} beginningOffsets {}: {}", new Object[]{str, collection, map});
                return map;
            }).when(kafkaConsumer)).beginningOffsets((Collection) Matchers.any());
            Mockito.when(kafkaConsumer.assignment()).thenReturn(Collections.singleton(new TopicPartition(getTopic(), 0)));
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock4 -> {
                seekConsumerToBeginning(pair, (Collection) invocationOnMock4.getArguments()[0]);
                return null;
            }).when(kafkaConsumer)).seekToBeginning((Collection) Matchers.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock5 -> {
                commitConsumer(str, (Map) invocationOnMock5.getArguments()[0]);
                return null;
            }).when(kafkaConsumer)).commitSync((Map) Matchers.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock6 -> {
                return consumerGroup.getAssignment(((Integer) pair.getSecond()).intValue()).stream().map(partition2 -> {
                    return new TopicPartition(getTopic(), partition2.getId());
                }).collect(Collectors.toSet());
            }).when(kafkaConsumer)).assignment();
            Mockito.when(kafkaConsumer.partitionsFor((String) Mockito.eq(consumerGroup.getTopic()))).thenReturn(IntStream.range(0, consumerGroup.getNumPartitions()).mapToObj(i2 -> {
                return new PartitionInfo(consumerGroup.getTopic(), i2, (Node) null, (Node[]) null, (Node[]) null);
            }).collect(Collectors.toList()));
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock7 -> {
                consumerGroup.remove(((Integer) pair.getSecond()).intValue());
                return null;
            }).when(kafkaConsumer)).close();
            return kafkaConsumer;
        }

        private void commitConsumer(String str, Map<TopicPartition, OffsetAndMetadata> map) {
            map.entrySet().forEach(entry -> {
                this.committedOffsets.get(Pair.of(str, Integer.valueOf(((TopicPartition) entry.getKey()).partition()))).set((int) ((OffsetAndMetadata) entry.getValue()).offset());
            });
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} committed offsets {}", str, map);
        }

        private void seekConsumerToBeginning(Pair<String, Integer> pair, Collection<TopicPartition> collection) {
            List<Pair<Integer, AtomicInteger>> list = this.consumerOffsets.get(pair);
            for (TopicPartition topicPartition : collection) {
                for (Pair<Integer, AtomicInteger> pair2 : list) {
                    if (((Integer) pair2.getFirst()).intValue() == topicPartition.partition()) {
                        ((AtomicInteger) pair2.getSecond()).set(0);
                    }
                }
            }
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} seeked to beginning of {}", pair.getFirst(), collection);
        }

        private Map<TopicPartition, Long> getEndOffsets(String str, Collection<TopicPartition> collection) {
            HashMap hashMap = new HashMap();
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                hashMap.put(new TopicPartition(getTopic(), it.next().partition()), Long.valueOf(this.written.get(r0.partition()).size()));
            }
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} endOffsets {}: {}", new Object[]{str, collection, hashMap});
            return hashMap;
        }

        private ConsumerRecords<String, byte[]> pollConsumer(ConsumerGroup consumerGroup, long j, Pair<String, Integer> pair) throws InterruptedException {
            String str = (String) pair.getFirst();
            Thread.sleep(j);
            HashMap hashMap = new HashMap();
            Collection<Partition> assignment = consumerGroup.getAssignment(((Integer) pair.getSecond()).intValue());
            List<Pair<Integer, AtomicInteger>> list = this.consumerOffsets.get(pair);
            if (LocalKafkaCommitLogDescriptor.log.isDebugEnabled()) {
                LocalKafkaCommitLogDescriptor.log.debug("Polling consumerId {}.{} with assignment {}", new Object[]{Integer.valueOf(this.descriptorId), pair, assignment.stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.toList())});
            }
            for (Partition partition : assignment) {
                int id = partition.getId();
                List<StreamElement> list2 = this.written.get(id);
                int size = list2.size();
                ArrayList arrayList = new ArrayList();
                int intValue = ((Integer) list.stream().filter(pair2 -> {
                    return ((Integer) pair2.getFirst()).intValue() == id;
                }).map(pair3 -> {
                    return Integer.valueOf(((AtomicInteger) pair3.getSecond()).get());
                }).findAny().orElse(Integer.valueOf(this.committedOffsets.get(Pair.of(str, Integer.valueOf(id))).get()))).intValue();
                LocalKafkaCommitLogDescriptor.log.trace("Partition {} has last {}, reading from {}", new Object[]{Integer.valueOf(id), Integer.valueOf(size), Integer.valueOf(intValue)});
                while (intValue < size) {
                    if (intValue >= 0) {
                        arrayList.add(toConsumerRecord(list2.get(intValue), partition.getId(), intValue));
                    }
                    intValue++;
                }
                boolean z = false;
                Iterator<Pair<Integer, AtomicInteger>> it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Pair<Integer, AtomicInteger> next = it.next();
                    if (((Integer) next.getFirst()).intValue() == id) {
                        ((AtomicInteger) next.getSecond()).set(intValue);
                        LocalKafkaCommitLogDescriptor.log.trace("Advanced offset of consumer ID {} on partition {} to {}", new Object[]{pair, Integer.valueOf(id), Integer.valueOf(intValue)});
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    list.add(Pair.of(Integer.valueOf(id), new AtomicInteger(intValue)));
                }
                if (!arrayList.isEmpty()) {
                    hashMap.put(new TopicPartition(getTopic(), id), arrayList);
                }
            }
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} id {} polled records {}", new Object[]{str, pair, hashMap});
            return new ConsumerRecords<>(hashMap);
        }

        private ConsumerRecord<String, byte[]> toConsumerRecord(StreamElement streamElement, int i, int i2) {
            return new ConsumerRecord<>(getTopic(), i, i2, System.currentTimeMillis(), TimestampType.LOG_APPEND_TIME, 0L, -1, -1, streamElement.getKey() + "#" + streamElement.getAttribute(), streamElement.getValue());
        }

        public boolean allConsumed() {
            return allConsumed((List) this.written.stream().map((v0) -> {
                return v0.size();
            }).collect(Collectors.toList()));
        }

        public boolean allConsumed(List<Integer> list) {
            return ((Boolean) this.consumerGroups.keySet().stream().map(str -> {
                int i = 0;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    int i2 = i;
                    if (this.consumerOffsets.get(Pair.of(str, Integer.valueOf(i2))).stream().filter(pair -> {
                        return ((Integer) pair.getFirst()).intValue() == i2;
                    }).filter(pair2 -> {
                        return ((AtomicInteger) pair2.getSecond()).get() < intValue;
                    }).findAny().isPresent()) {
                        return false;
                    }
                    i++;
                }
                return true;
            }).reduce(true, (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            })).booleanValue();
        }

        public Optional<PartitionedView> getPartitionedView(Context context) {
            return Optional.of(newReader(context));
        }

        public Optional<CommitLogReader> getCommitLogReader(Context context) {
            return Optional.of(newReader(context));
        }

        public Optional<AttributeWriterBase> getWriter(Context context) {
            return Optional.of(newWriter());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public LocalKafkaWriter newWriter() {
            return new LocalKafkaWriter(this, this.numPartitions, this.descriptorId);
        }

        LocalKafkaLogReader newReader(Context context) {
            return new LocalKafkaLogReader(this, context);
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            objectOutputStream.defaultWriteObject();
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            objectInputStream.defaultReadObject();
            Accessor accessor = (Accessor) ((Map) LocalKafkaCommitLogDescriptor.ACCESSORS.get(Integer.valueOf(this.descriptorId))).get(getURI());
            this.committedOffsets = accessor.committedOffsets;
            this.consumerGroups = accessor.consumerGroups;
            this.consumerOffsets = accessor.consumerOffsets;
            this.written = accessor.written;
        }
    }

    /* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaLogReader.class */
    public static class LocalKafkaLogReader extends KafkaLogReader {
        private static final Logger log = LoggerFactory.getLogger(LocalKafkaLogReader.class);

        public LocalKafkaLogReader(KafkaAccessor kafkaAccessor, Context context) {
            super(kafkaAccessor, context);
        }

        public Cancellable observePartitions(Collection<Partition> collection, CommitLogReader.Position position, boolean z, LogObserver logObserver) {
            Cancellable observePartitions = super.observePartitions((String) null, collection, position, z, logObserver, (ConsumerRebalanceListener) null);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog URI {}", collection, getURI());
            return observePartitions;
        }

        public Cancellable observe(String str, CommitLogReader.Position position, LogObserver logObserver) {
            Cancellable observe = super.observe(str, position, logObserver);
            log.debug("Started to observe LocalKafkaCommitLog with URI {} by consumer {}", getURI(), str);
            return observe;
        }

        protected Cancellable observePartitions(String str, Collection<Partition> collection, CommitLogReader.Position position, boolean z, LogObserver logObserver, ConsumerRebalanceListener consumerRebalanceListener) {
            Cancellable observePartitions = super.observePartitions(str, collection, position, z, logObserver, consumerRebalanceListener);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog with URI {} by consumer {}", new Object[]{collection, getURI(), str});
            return observePartitions;
        }

        public <T> Dataset<T> observe(Flow flow, String str, PartitionedLogObserver<T> partitionedLogObserver) {
            Dataset<T> observe = super.observe(flow, str, partitionedLogObserver);
            log.debug("Started to observe view of LocalKafkaCommitLog with URI {} by consumer name {}", getURI(), str);
            return observe;
        }

        public <T> Dataset<T> observePartitions(Flow flow, Collection<Partition> collection, PartitionedLogObserver<T> partitionedLogObserver) {
            Dataset<T> observePartitions = super.observePartitions(flow, collection, partitionedLogObserver);
            log.debug("Started to observe partitions {} of view of LocalKafkaCommitLog with URI {}", collection.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()), getURI());
            return observePartitions;
        }

        public Cancellable observeBulk(String str, CommitLogReader.Position position, BulkLogObserver bulkLogObserver) {
            Cancellable observeBulk = super.observeBulk(str, position, bulkLogObserver);
            log.debug("Started to bulk observe LocalKafkaCommitLog with URI {} by {}", getURI(), str);
            return observeBulk;
        }

        /* renamed from: getAccessor, reason: merged with bridge method [inline-methods] */
        public Accessor m4getAccessor() {
            return (Accessor) this.accessor;
        }
    }

    /* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaWriter.class */
    public static class LocalKafkaWriter extends KafkaWriter {
        private final int numPartitions;
        private final int descriptorId;

        public LocalKafkaWriter(Accessor accessor, int i, int i2) {
            super(accessor);
            this.numPartitions = i;
            this.descriptorId = i2;
        }

        public void write(StreamElement streamElement, CommitCallback commitCallback) {
            int partitionId = (this.accessor.getPartitioner().getPartitionId(streamElement.getKey(), streamElement.getAttribute(), streamElement.getValue()) & Integer.MAX_VALUE) % this.numPartitions;
            LocalKafkaCommitLogDescriptor.log.debug("Written data {} to LocalKafkaCommitLog descriptorId {} URI {}, partition {}", new Object[]{streamElement, Integer.valueOf(this.descriptorId), getURI(), Integer.valueOf(partitionId)});
            ((Accessor) this.accessor).written.get(partitionId).add(streamElement);
            commitCallback.commit(true, (Throwable) null);
        }

        /* renamed from: getAccessor, reason: merged with bridge method [inline-methods] */
        public Accessor m5getAccessor() {
            return (Accessor) this.accessor;
        }
    }

    public LocalKafkaCommitLogDescriptor() {
        super(Arrays.asList("kafka-test"));
        this.id = System.identityHashCode(this);
        ACCESSORS.put(Integer.valueOf(this.id), Collections.synchronizedMap(new HashMap()));
    }

    public Accessor getAccessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map) {
        Map<URI, Accessor> map2 = ACCESSORS.get(Integer.valueOf(this.id));
        Accessor accessor = new Accessor(entityDescriptor, uri, map, this.id);
        Preconditions.checkArgument(map2.putIfAbsent(uri, accessor) == null, "URI " + uri + " is already registered!");
        return accessor;
    }

    /* renamed from: getAccessor, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ DataAccessor m2getAccessor(EntityDescriptor entityDescriptor, URI uri, Map map) {
        return getAccessor(entityDescriptor, uri, (Map<String, Object>) map);
    }
}
