/*
 * Decompiled with CFR 0.152.
 */
package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgementConfig;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgingTransformer;
import com.expediagroup.rhapsody.kafka.acknowledgement.MultipleReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.acknowledgement.OrderManagingReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.extractor.ConsumerRecordExtraction;
import com.expediagroup.rhapsody.kafka.factory.KafkaConfigFactory;
import com.expediagroup.rhapsody.util.ConfigLoading;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;

public class KafkaFluxFactory<K, V> {
    public static final String POLL_TIMEOUT_CONFIG = "poll.timeout";
    public static final String CLOSE_TIMEOUT_CONFIG = "close.timeout";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "max.commit.attempts";
    public static final String BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT_CONFIG = "block.request.on.partition.assignment";
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private static final boolean DEFAULT_BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT = false;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaFluxFactory.class);
    private static final Map<String, AtomicLong> REGISTRATION_COUNTS_BY_CLIENT_ID = new ConcurrentHashMap<String, AtomicLong>();
    private final KafkaConfigFactory configFactory;

    public KafkaFluxFactory(KafkaConfigFactory configFactory) {
        this.configFactory = configFactory;
    }

    public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAutoGroup(Collection<String> topics, AutoAcknowledgementConfig autoAcknowledgementConfig, Function<? super Flux<ConsumerRecord<K, V>>, ? extends Publisher<ConsumerRecord<K, V>>> pregroup) {
        return this.receiveAuto(topics, autoAcknowledgementConfig).transform(pregroup).groupBy(ConsumerRecordExtraction::extractTopicPartition);
    }

    public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, AutoAcknowledgementConfig autoAcknowledgementConfig) {
        return this.receive(topics, new MultipleReceiverAcknowledgementStrategy()).transformDeferred((Function)new AutoAcknowledgingTransformer(autoAcknowledgementConfig, KafkaFluxFactory::collectAcknowledgers, KafkaFluxFactory::acknowledge)).map(Acknowledgeable::get);
    }

    public Flux<GroupedFlux<TopicPartition, Acknowledgeable<ConsumerRecord<K, V>>>> receiveGroup(Collection<String> topics, ReceiverAcknowledgementStrategy acknowledgementStrategy, Function<? super Flux<Acknowledgeable<ConsumerRecord<K, V>>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> pregroup) {
        return this.receive(topics, acknowledgementStrategy).transform(pregroup).groupBy(KafkaFluxFactory::extractTopicPartition);
    }

    public Flux<Acknowledgeable<ConsumerRecord<K, V>>> receive(Collection<String> topics) {
        return this.receive(topics, new OrderManagingReceiverAcknowledgementStrategy());
    }

    public Flux<Acknowledgeable<ConsumerRecord<K, V>>> receive(Collection<String> topics, ReceiverAcknowledgementStrategy acknowledgementStrategy) {
        Map properties = (Map)this.configFactory.create();
        ReceiverOptions receiverOptions = ReceiverOptions.create((Map)properties);
        receiverOptions.pollTimeout((Duration)ConfigLoading.load((Map)properties, (String)POLL_TIMEOUT_CONFIG, Duration::parse, (Object)receiverOptions.pollTimeout()));
        receiverOptions.closeTimeout((Duration)ConfigLoading.load((Map)properties, (String)CLOSE_TIMEOUT_CONFIG, Duration::parse, (Object)DEFAULT_CLOSE_TIMEOUT));
        receiverOptions.commitInterval(ConfigLoading.load((Map)properties, (String)"auto.commit.interval.ms", Long::valueOf).map(Duration::ofMillis).orElse(receiverOptions.commitInterval()));
        receiverOptions.maxCommitAttempts(((Integer)ConfigLoading.load((Map)properties, (String)MAX_COMMIT_ATTEMPTS_CONFIG, Integer::valueOf, (Object)receiverOptions.maxCommitAttempts())).intValue());
        CompletableFuture assignedPartitions = (Boolean)ConfigLoading.load((Map)properties, (String)BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT_CONFIG, Boolean::valueOf, (Object)false) != false ? new CompletableFuture() : CompletableFuture.completedFuture(Collections.emptyList());
        CompletionStage assignedPartitionPositions = assignedPartitions.thenAccept(partitions -> partitions.forEach(ReceiverPartition::position));
        receiverOptions.addAssignListener(assignedPartitions::complete);
        Scheduler scheduler = Schedulers.newElastic((String)(KafkaFluxFactory.class.getSimpleName() + "-" + KafkaFluxFactory.extractClientId(receiverOptions)));
        receiverOptions.schedulerSupplier(() -> scheduler);
        return Flux.defer(() -> KafkaReceiver.create(KafkaFluxFactory.createUniqueReceiverOptions(receiverOptions, topics)).receive()).transform(acknowledgementStrategy.createRecordTransformer(properties)).transform(arg_0 -> KafkaFluxFactory.lambda$receive$3((Future)((Object)assignedPartitionPositions), arg_0));
    }

    private static <K, V> ReceiverOptions<K, V> createUniqueReceiverOptions(ReceiverOptions<K, V> receiverOptions, Collection<String> topics) {
        ReceiverOptions uniqueReceiverOptions = ReceiverOptions.create((Map)receiverOptions.consumerProperties()).consumerProperty("client.id", (Object)KafkaFluxFactory.registerNewClient(KafkaFluxFactory.extractClientId(receiverOptions))).pollTimeout(receiverOptions.pollTimeout()).closeTimeout(receiverOptions.closeTimeout()).commitInterval(receiverOptions.commitInterval()).maxCommitAttempts(receiverOptions.maxCommitAttempts()).schedulerSupplier(receiverOptions.schedulerSupplier()).subscription(topics);
        receiverOptions.assignListeners().forEach(arg_0 -> ((ReceiverOptions)uniqueReceiverOptions).addAssignListener(arg_0));
        return uniqueReceiverOptions;
    }

    private static <T> Mono<T> blockRequestOn(Future<?> future) {
        return Mono.empty().doOnRequest(requested -> {
            try {
                future.get();
            }
            catch (Exception e) {
                LOGGER.error("Failed to block Request Thread on Future", (Throwable)e);
            }
        });
    }

    private static <K, V> Mono<Map<TopicPartition, Runnable>> collectAcknowledgers(Flux<Acknowledgeable<ConsumerRecord<K, V>>> acknowledgeables) {
        return acknowledgeables.collectMap(KafkaFluxFactory::extractTopicPartition, Acknowledgeable::getAcknowledger);
    }

    private static void acknowledge(Map<TopicPartition, Runnable> acknowledgers) {
        acknowledgers.values().forEach(Runnable::run);
    }

    private static <K, V> TopicPartition extractTopicPartition(Acknowledgeable<ConsumerRecord<K, V>> acknowledgeable) {
        return ConsumerRecordExtraction.extractTopicPartition((ConsumerRecord)((ConsumerRecord)acknowledgeable.get()));
    }

    private static String extractClientId(ReceiverOptions receiverOptions) {
        return Objects.toString(receiverOptions.consumerProperty("client.id"));
    }

    private static String registerNewClient(String clientId) {
        return clientId + "-" + REGISTRATION_COUNTS_BY_CLIENT_ID.computeIfAbsent(clientId, key -> new AtomicLong()).incrementAndGet();
    }

    private static /* synthetic */ Publisher lambda$receive$3(Future assignedPartitionPositions, Flux records) {
        return assignedPartitionPositions.isDone() ? records : records.mergeWith(KafkaFluxFactory.blockRequestOn(assignedPartitionPositions));
    }
}

