package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.retrieving.IncomingMessageDetails;
import ch.squaredesk.nova.comm.retrieving.MessageReceiver;
import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.metrics.Metrics;
import ch.squaredesk.nova.tuples.Pair;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/kafka/KafkaMessageReceiver.class */
public class KafkaMessageReceiver<InternalMessageType> extends MessageReceiver<String, InternalMessageType, String, KafkaSpecificInfo> {
    private final Logger logger;
    private final Flowable<IncomingMessage<InternalMessageType, String, KafkaSpecificInfo>> allMessagesStream;
    private final Scheduler scheduler;
    private final Map<String, AtomicInteger> topicToSubscriptionCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaMessageReceiver(String str, Properties properties, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        super(str, messageUnmarshaller, metrics);
        this.logger = LoggerFactory.getLogger(KafkaMessageReceiver.class);
        this.scheduler = Schedulers.io();
        this.topicToSubscriptionCount = new ConcurrentHashMap();
        long j = 1;
        TimeUnit timeUnit = TimeUnit.SECONDS;
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Function function = kafkaConsumer -> {
            ConsumerRecords consumerRecords = null;
            do {
                try {
                    consumerRecords = kafkaConsumer.poll(timeUnit.toMillis(j));
                    if (consumerRecords != null && consumerRecords.isEmpty()) {
                        this.logger.trace("Ignoring empty consumer records");
                    }
                    if (consumerRecords != null) {
                        break;
                    }
                } catch (Exception e) {
                }
            } while (!atomicBoolean.get());
            return consumerRecords;
        };
        Runnable runnable = () -> {
            this.logger.trace("No topic subscribed yet, sleeping {} {}", Long.valueOf(j), timeUnit);
            try {
                Thread.currentThread();
                Thread.sleep(timeUnit.toMillis(j));
            } catch (InterruptedException e) {
            }
        };
        BiFunction biFunction = (set, pair) -> {
            if (!((HashSet) pair._2).equals(set)) {
                this.logger.debug("Changing topic subscriptions to " + set);
                ((HashSet) pair._2).clear();
                ((HashSet) pair._2).addAll(set);
                ((KafkaConsumer) pair._1).subscribe(set);
            }
            return Boolean.valueOf(!((HashSet) pair._2).isEmpty());
        };
        this.allMessagesStream = Flowable.generate(() -> {
            this.logger.info("Opening connection to Kafka broker");
            return new Pair(new KafkaConsumer(properties), new HashSet());
        }, (pair2, emitter) -> {
            while (!((Boolean) biFunction.apply(this.topicToSubscriptionCount.keySet(), pair2)).booleanValue()) {
                runnable.run();
            }
            ConsumerRecords consumerRecords = (ConsumerRecords) function.apply(pair2._1);
            if (consumerRecords == null) {
                emitter.onComplete();
            } else {
                this.logger.debug("Read consumer records, size = {}", Integer.valueOf(consumerRecords.count()));
                emitter.onNext(consumerRecords);
            }
        }, pair3 -> {
            this.logger.info("Shutting down connection to Kafka broker");
            try {
                ((KafkaConsumer) pair3._1).close();
            } catch (Exception e) {
                this.logger.info("An error occurred trying to close KafkaConsumer", e.getCause());
            }
        }).subscribeOn(this.scheduler).concatMap(this::unmarshall).map(pair4 -> {
            return new IncomingMessage(pair4._2, new IncomingMessageDetails.Builder().withDestination(pair4._1).withTransportSpecificDetails(new KafkaSpecificInfo()).build());
        }).share();
    }

    Flowable<Pair<String, InternalMessageType>> unmarshall(ConsumerRecords<String, String> consumerRecords) {
        return Flowable.create(flowableEmitter -> {
            consumerRecords.forEach(consumerRecord -> {
                try {
                    flowableEmitter.onNext(new Pair(consumerRecord.topic(), this.messageUnmarshaller.unmarshal(consumerRecord.value())));
                } catch (Throwable th) {
                    this.metricsCollector.unparsableMessageReceived(consumerRecord.topic());
                    this.logger.error("Unable to parse incoming message " + consumerRecord, th);
                }
            });
            flowableEmitter.onComplete();
        }, BackpressureStrategy.BUFFER);
    }

    public Flowable<IncomingMessage<InternalMessageType, String, KafkaSpecificInfo>> messages(String str) {
        Objects.requireNonNull(str, "destination must not be null");
        Objects.requireNonNull(this.messageUnmarshaller, "unmarshaller must not be null");
        return this.allMessagesStream.filter(incomingMessage -> {
            return str.equals(incomingMessage.details.destination);
        }).doOnSubscribe(subscription -> {
            this.scheduler.scheduleDirect(() -> {
                this.logger.info("Subscribing to topic {}, current subscription count is  {}", str, Integer.valueOf(this.topicToSubscriptionCount.computeIfAbsent(str, str2 -> {
                    return new AtomicInteger(0);
                }).incrementAndGet()));
                this.metricsCollector.subscriptionCreated(str);
            });
        }).doFinally(() -> {
            this.scheduler.scheduleDirect(() -> {
                this.metricsCollector.subscriptionDestroyed(str);
                AtomicInteger atomicInteger = this.topicToSubscriptionCount.get(str);
                if (atomicInteger == null) {
                    this.logger.error("WTF! Unsubscribing topic {} but the counter is gone?!?!?", str);
                    return;
                }
                int decrementAndGet = atomicInteger.decrementAndGet();
                if (decrementAndGet != 0) {
                    this.logger.info("Unsubscribed from topic {}, current subscription count is  {}", str, Integer.valueOf(decrementAndGet));
                } else {
                    this.topicToSubscriptionCount.remove(str);
                    this.logger.info("Unsubscribed last subscription to topic " + str);
                }
            });
        });
    }

    public void shutdown() {
        this.logger.info("Shutting down, currently subscribed to " + this.topicToSubscriptionCount.keySet());
        this.topicToSubscriptionCount.clear();
    }
}
