package no.skatteetaten.fastsetting.formueinntekt.felles.feed.client.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import no.skatteetaten.fastsetting.formueinntekt.felles.feed.api.FeedEndpoint;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;

/* loaded from: input_file:no/skatteetaten/fastsetting/formueinntekt/felles/feed/client/kafka/KafkaFeedEndpoint.class */
public class KafkaFeedEndpoint<KEY, VALUE> implements FeedEndpoint<KafkaLocation, KafkaFeedEntry<KEY, VALUE>, KafkaFeedPage<KEY, VALUE>>, AutoCloseable {
    private final Consumer<KEY, VALUE> consumer;
    private final String topic;
    private final Duration timeout;
    private final Set<Integer> partitions = ConcurrentHashMap.newKeySet();
    private boolean unsubscribed = true;

    public KafkaFeedEndpoint(Consumer<KEY, VALUE> consumer, String str, Duration duration) {
        this.consumer = consumer;
        this.topic = str;
        this.timeout = duration;
    }

    public static <KEY, VALUE> KafkaFeedEndpoint<KEY, VALUE> of(String str, String str2, Duration duration, Deserializer<KEY> deserializer, Deserializer<VALUE> deserializer2) {
        return of(str, str2, duration, deserializer, deserializer2, Collections.emptyMap());
    }

    public static <KEY, VALUE> KafkaFeedEndpoint<KEY, VALUE> of(String str, String str2, Duration duration, Deserializer<KEY> deserializer, Deserializer<VALUE> deserializer2, Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        if (!hashMap.containsKey("group.id")) {
            hashMap.put("group.id", "feed-" + UUID.randomUUID());
        }
        hashMap.put("bootstrap.servers", str);
        hashMap.put("auto.offset.reset", "latest");
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("isolation.level", "read_committed");
        return new KafkaFeedEndpoint<>(new KafkaConsumer(hashMap, deserializer, deserializer2), str2, duration);
    }

    private void initialize() {
        if (this.unsubscribed) {
            this.consumer.subscribe(Collections.singleton(this.topic), new ConsumerRebalanceListener() { // from class: no.skatteetaten.fastsetting.formueinntekt.felles.feed.client.kafka.KafkaFeedEndpoint.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    synchronized (KafkaFeedEndpoint.this) {
                        collection.stream().filter(topicPartition -> {
                            return topicPartition.topic().equals(KafkaFeedEndpoint.this.topic);
                        }).forEach(topicPartition2 -> {
                            KafkaFeedEndpoint.this.partitions.remove(Integer.valueOf(topicPartition2.partition()));
                        });
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    synchronized (KafkaFeedEndpoint.this) {
                        collection.stream().filter(topicPartition -> {
                            return topicPartition.topic().equals(KafkaFeedEndpoint.this.topic);
                        }).forEach(topicPartition2 -> {
                            KafkaFeedEndpoint.this.partitions.add(Integer.valueOf(topicPartition2.partition()));
                        });
                    }
                }
            });
            this.consumer.poll(this.timeout);
            this.unsubscribed = false;
        }
    }

    public synchronized Optional<KafkaFeedPage<KEY, VALUE>> getFirstPage() {
        initialize();
        List list = (List) this.partitions.stream().map(num -> {
            return new TopicPartition(this.topic, num.intValue());
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return Optional.empty();
        }
        this.consumer.seekToBeginning(list);
        return Optional.of(new KafkaFeedPage(this.consumer.poll(this.timeout), Collections.emptyMap()));
    }

    public synchronized Optional<KafkaFeedPage<KEY, VALUE>> getLastPage() {
        throw new UnsupportedOperationException("Cannot read last page for Kafka log");
    }

    public synchronized Optional<KafkaLocation> getLastLocation() {
        initialize();
        List list = (List) this.partitions.stream().map(num -> {
            return new TopicPartition(this.topic, num.intValue());
        }).collect(Collectors.toList());
        return list.isEmpty() ? Optional.empty() : Optional.of(new KafkaLocation((Map) this.consumer.endOffsets(list).entrySet().stream().collect(Collectors.toMap(entry -> {
            return Integer.valueOf(((TopicPartition) entry.getKey()).partition());
        }, (v0) -> {
            return v0.getValue();
        }))));
    }

    public synchronized Optional<KafkaFeedPage<KEY, VALUE>> getPage(KafkaLocation kafkaLocation) {
        if (kafkaLocation.getOffsets().isEmpty()) {
            throw new IllegalArgumentException("Supplied location does not define any offsets");
        }
        initialize();
        ArrayList arrayList = new ArrayList();
        this.partitions.stream().map(num -> {
            return new TopicPartition(this.topic, num.intValue());
        }).forEach(topicPartition -> {
            if (kafkaLocation.getOffsets().containsKey(Integer.valueOf(topicPartition.partition()))) {
                this.consumer.seek(topicPartition, kafkaLocation.getOffsets().get(Integer.valueOf(topicPartition.partition())).longValue() + 1);
            } else {
                arrayList.add(topicPartition);
            }
        });
        if (!arrayList.isEmpty()) {
            this.consumer.seekToBeginning(arrayList);
        }
        return Optional.of(new KafkaFeedPage(this.consumer.poll(this.timeout), kafkaLocation.getOffsets())).filter((v0) -> {
            return v0.isActive();
        });
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        this.consumer.close();
    }
}
