package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgementConfig;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgingTransformer;
import com.expediagroup.rhapsody.kafka.acknowledgement.MultipleReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.acknowledgement.OrderManagingReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.extractor.ConsumerRecordExtraction;
import com.expediagroup.rhapsody.util.ConfigLoading;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

/* loaded from: input_file:com/expediagroup/rhapsody/kafka/factory/KafkaFluxFactory.class */
public class KafkaFluxFactory<K, V> {
    public static final String POLL_TIMEOUT_CONFIG = "poll.timeout";
    public static final String CLOSE_TIMEOUT_CONFIG = "close.timeout";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "max.commit.attempts";
    public static final String BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT_CONFIG = "block.request.on.partition.assignment";
    private static final boolean DEFAULT_BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT = false;
    private final KafkaConfigFactory configFactory;
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaFluxFactory.class);
    private static final Map<String, AtomicLong> REGISTRATION_COUNTS_BY_CLIENT_ID = new ConcurrentHashMap();

    public KafkaFluxFactory(KafkaConfigFactory kafkaConfigFactory) {
        this.configFactory = kafkaConfigFactory;
    }

    public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAutoGroup(Collection<String> collection, AutoAcknowledgementConfig autoAcknowledgementConfig, Function<? super Flux<ConsumerRecord<K, V>>, ? extends Publisher<ConsumerRecord<K, V>>> function) {
        return receiveAuto(collection, autoAcknowledgementConfig).transform(function).groupBy(ConsumerRecordExtraction::extractTopicPartition);
    }

    public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> collection, AutoAcknowledgementConfig autoAcknowledgementConfig) {
        return receive(collection, new MultipleReceiverAcknowledgementStrategy()).transformDeferred(new AutoAcknowledgingTransformer(autoAcknowledgementConfig, KafkaFluxFactory::collectAcknowledgers, KafkaFluxFactory::acknowledge)).map((v0) -> {
            return v0.get();
        });
    }

    public Flux<GroupedFlux<TopicPartition, Acknowledgeable<ConsumerRecord<K, V>>>> receiveGroup(Collection<String> collection, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy, Function<? super Flux<Acknowledgeable<ConsumerRecord<K, V>>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> function) {
        return receive(collection, receiverAcknowledgementStrategy).transform(function).groupBy(KafkaFluxFactory::extractTopicPartition);
    }

    public Flux<Acknowledgeable<ConsumerRecord<K, V>>> receive(Collection<String> collection) {
        return receive(collection, new OrderManagingReceiverAcknowledgementStrategy());
    }

    public Flux<Acknowledgeable<ConsumerRecord<K, V>>> receive(Collection<String> collection, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy) {
        Map<String, ?> map = (Map) this.configFactory.create();
        ReceiverOptions create = ReceiverOptions.create(map);
        create.pollTimeout((Duration) ConfigLoading.load(map, POLL_TIMEOUT_CONFIG, (v0) -> {
            return Duration.parse(v0);
        }, create.pollTimeout()));
        create.closeTimeout((Duration) ConfigLoading.load(map, CLOSE_TIMEOUT_CONFIG, (v0) -> {
            return Duration.parse(v0);
        }, DEFAULT_CLOSE_TIMEOUT));
        create.commitInterval((Duration) ConfigLoading.load(map, "auto.commit.interval.ms", Long::valueOf).map((v0) -> {
            return Duration.ofMillis(v0);
        }).orElse(create.commitInterval()));
        create.maxCommitAttempts(((Integer) ConfigLoading.load(map, MAX_COMMIT_ATTEMPTS_CONFIG, Integer::valueOf, Integer.valueOf(create.maxCommitAttempts()))).intValue());
        CompletableFuture completableFuture = ((Boolean) ConfigLoading.load(map, BLOCK_REQUEST_ON_PARTITION_ASSIGNMENT_CONFIG, Boolean::valueOf, false)).booleanValue() ? new CompletableFuture() : CompletableFuture.completedFuture(Collections.emptyList());
        CompletableFuture<Void> thenAccept = completableFuture.thenAccept(collection2 -> {
            collection2.forEach((v0) -> {
                v0.position();
            });
        });
        completableFuture.getClass();
        create.addAssignListener((v1) -> {
            r1.complete(v1);
        });
        Scheduler newElastic = Schedulers.newElastic(KafkaFluxFactory.class.getSimpleName() + "-" + extractClientId(create));
        create.schedulerSupplier(() -> {
            return newElastic;
        });
        return Flux.defer(() -> {
            return KafkaReceiver.create(createUniqueReceiverOptions(create, collection)).receive();
        }).transform(receiverAcknowledgementStrategy.createRecordTransformer(map)).transform(flux -> {
            return thenAccept.isDone() ? flux : flux.mergeWith(blockRequestOn(thenAccept));
        });
    }

    private static <K, V> ReceiverOptions<K, V> createUniqueReceiverOptions(ReceiverOptions<K, V> receiverOptions, Collection<String> collection) {
        ReceiverOptions<K, V> subscription = ReceiverOptions.create(receiverOptions.consumerProperties()).consumerProperty("client.id", registerNewClient(extractClientId(receiverOptions))).pollTimeout(receiverOptions.pollTimeout()).closeTimeout(receiverOptions.closeTimeout()).commitInterval(receiverOptions.commitInterval()).maxCommitAttempts(receiverOptions.maxCommitAttempts()).schedulerSupplier(receiverOptions.schedulerSupplier()).subscription(collection);
        List assignListeners = receiverOptions.assignListeners();
        subscription.getClass();
        assignListeners.forEach(subscription::addAssignListener);
        return subscription;
    }

    private static <T> Mono<T> blockRequestOn(Future<?> future) {
        return Mono.empty().doOnRequest(j -> {
            try {
                future.get();
            } catch (Exception e) {
                LOGGER.error("Failed to block Request Thread on Future", e);
            }
        });
    }

    private static <K, V> Mono<Map<TopicPartition, Runnable>> collectAcknowledgers(Flux<Acknowledgeable<ConsumerRecord<K, V>>> flux) {
        return flux.collectMap(KafkaFluxFactory::extractTopicPartition, (v0) -> {
            return v0.getAcknowledger();
        });
    }

    private static void acknowledge(Map<TopicPartition, Runnable> map) {
        map.values().forEach((v0) -> {
            v0.run();
        });
    }

    private static <K, V> TopicPartition extractTopicPartition(Acknowledgeable<ConsumerRecord<K, V>> acknowledgeable) {
        return ConsumerRecordExtraction.extractTopicPartition((ConsumerRecord) acknowledgeable.get());
    }

    private static String extractClientId(ReceiverOptions receiverOptions) {
        return Objects.toString(receiverOptions.consumerProperty("client.id"));
    }

    private static String registerNewClient(String str) {
        return str + "-" + REGISTRATION_COUNTS_BY_CLIENT_ID.computeIfAbsent(str, str2 -> {
            return new AtomicLong();
        }).incrementAndGet();
    }
}
