package net.corda.node.services.messaging;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.concurrent.TimersKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.PropertyReference1Impl;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KProperty;
import net.corda.core.utilities.KotlinUtilsKt;
import net.corda.node.internal.LifecycleSupport;
import net.corda.node.internal.artemis.ReactiveArtemisConsumer;
import net.corda.node.services.config.NodeConfigurationImpl;
import net.corda.node.services.messaging.P2PMessagingConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: P2PMessagingClient.kt */
@Metadata(mv = {NodeConfigurationImpl.Defaults.lazyBridgeStart, NodeConfigurationImpl.Defaults.lazyBridgeStart, 11}, bv = {NodeConfigurationImpl.Defaults.lazyBridgeStart, 0, 2}, k = NodeConfigurationImpl.Defaults.lazyBridgeStart, d1 = {"��d\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010#\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0006\b\u0002\u0018�� '2\u00020\u0001:\u0001'BQ\u0012\f\u0010\u0002\u001a\b\u0012\u0004\u0012\u00020\u00040\u0003\u0012\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006\u0012\f\u0010\b\u001a\b\u0012\u0004\u0012\u00020\t0\u0006\u0012\u0018\u0010\n\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\t0\f0\u000b\u0012\u0006\u0010\r\u001a\u00020\u000e¢\u0006\u0002\u0010\u000fJ\b\u0010!\u001a\u00020\"H\u0002J\b\u0010#\u001a\u00020\"H\u0016J\b\u0010$\u001a\u00020\"H\u0016J\u0018\u0010%\u001a\u00020\t*\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\t0\fH\u0002J\u0018\u0010&\u001a\u00020\t*\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\t0\fH\u0002R \u0010\n\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\t0\f0\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\t0\u0006X\u0082\u0004¢\u0006\u0002\n��R\u0017\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u00150\u0014¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0018\u001a\u0004\u0018\u00010\u0019X\u0082\u000e¢\u0006\u0002\n��R\u0014\u0010\u001a\u001a\u00020\t8VX\u0096\u0004¢\u0006\u0006\u001a\u0004\b\u001b\u0010\u001cR\u000e\u0010\u001d\u001a\u00020\tX\u0082\u000e¢\u0006\u0002\n��R\u0014\u0010\u001e\u001a\b\u0012\u0004\u0012\u00020 0\u001fX\u0082\u0004¢\u0006\u0002\n��¨\u0006("}, d2 = {"Lnet/corda/node/services/messaging/P2PMessagingConsumer;", "Lnet/corda/node/internal/LifecycleSupport;", "queueNames", "", "", "createSession", "Lkotlin/Function0;", "Lorg/apache/activemq/artemis/api/core/client/ClientSession;", "isDrainingModeOn", "", "drainingModeWasChangedEvents", "Lrx/Observable;", "Lkotlin/Pair;", "metricsRegistry", "Lcom/codahale/metrics/MetricRegistry;", "(Ljava/util/Set;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function0;Lrx/Observable;Lcom/codahale/metrics/MetricRegistry;)V", "existingOnlyConsumer", "Lnet/corda/node/internal/artemis/ReactiveArtemisConsumer;", "initialAndExistingConsumer", "messages", "Lrx/subjects/PublishSubject;", "Lorg/apache/activemq/artemis/api/core/client/ClientMessage;", "getMessages", "()Lrx/subjects/PublishSubject;", "notificationTimer", "Ljava/util/Timer;", "started", "getStarted", "()Z", "startedFlag", "subscriptions", "", "Lrx/Subscription;", "scheduleDrainNotificationTimer", "", "start", "stop", "switchedOff", "switchedOn", "Companion", "node"})
/* loaded from: input_file:net/corda/node/services/messaging/P2PMessagingConsumer.class */
public final class P2PMessagingConsumer implements LifecycleSupport {
    private boolean startedFlag;

    @NotNull
    private final PublishSubject<ClientMessage> messages;
    private final ReactiveArtemisConsumer existingOnlyConsumer;
    private final ReactiveArtemisConsumer initialAndExistingConsumer;
    private final Set<Subscription> subscriptions;
    private Timer notificationTimer;
    private final Function0<Boolean> isDrainingModeOn;
    private final Observable<Pair<Boolean, Boolean>> drainingModeWasChangedEvents;
    private final MetricRegistry metricsRegistry;
    private static final String initialSessionMessages = "corda_p2p_message_type<>'session_init'";

    @Deprecated
    public static final Companion Companion = new Companion(null);
    private static final Lazy logger$delegate = LazyKt.lazy(new Function0<Logger>() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$Companion$logger$2
        @NotNull
        public final Logger invoke() {
            Logger logger = LoggerFactory.getLogger(P2PMessagingClient.class);
            Intrinsics.checkExpressionValueIsNotNull(logger, "LoggerFactory.getLogger(T::class.java)");
            return logger;
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: P2PMessagingClient.kt */
    @Metadata(mv = {NodeConfigurationImpl.Defaults.lazyBridgeStart, NodeConfigurationImpl.Defaults.lazyBridgeStart, 11}, bv = {NodeConfigurationImpl.Defaults.lazyBridgeStart, 0, 2}, k = NodeConfigurationImpl.Defaults.lazyBridgeStart, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\b\u0082\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��R\u001b\u0010\u0005\u001a\u00020\u00068BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\t\u0010\n\u001a\u0004\b\u0007\u0010\b¨\u0006\u000b"}, d2 = {"Lnet/corda/node/services/messaging/P2PMessagingConsumer$Companion;", "", "()V", "initialSessionMessages", "", "logger", "Lorg/slf4j/Logger;", "getLogger", "()Lorg/slf4j/Logger;", "logger$delegate", "Lkotlin/Lazy;", "node"})
    /* loaded from: input_file:net/corda/node/services/messaging/P2PMessagingConsumer$Companion.class */
    public static final class Companion {
        static final /* synthetic */ KProperty[] $$delegatedProperties = {(KProperty) Reflection.property1(new PropertyReference1Impl(Reflection.getOrCreateKotlinClass(Companion.class), "logger", "getLogger()Lorg/slf4j/Logger;"))};

        /* JADX INFO: Access modifiers changed from: private */
        public final Logger getLogger() {
            Lazy lazy = P2PMessagingConsumer.logger$delegate;
            KProperty kProperty = $$delegatedProperties[0];
            return (Logger) lazy.getValue();
        }

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public final PublishSubject<ClientMessage> getMessages() {
        return this.messages;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void scheduleDrainNotificationTimer() {
        long millis = KotlinUtilsKt.getSeconds(10).toMillis();
        long millis2 = KotlinUtilsKt.getMinutes(1).toMillis();
        Timer timer = TimersKt.timer("DrainNotificationTimer", true);
        timer.schedule(new TimerTask() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$scheduleDrainNotificationTimer$$inlined$timer$1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                P2PMessagingConsumer.Companion companion;
                Logger logger;
                MetricRegistry metricRegistry;
                companion = P2PMessagingConsumer.Companion;
                logger = companion.getLogger();
                StringBuilder append = new StringBuilder().append("Node is currently in draining mode, new flows will not be processed! Flows in flight: ");
                metricRegistry = P2PMessagingConsumer.this.metricsRegistry;
                Gauge gauge = (Gauge) metricRegistry.getGauges().get("Flows.InFlight");
                logger.warn(append.append(gauge != null ? gauge.getValue() : null).toString());
            }
        }, millis, millis2);
        this.notificationTimer = timer;
    }

    @Override // net.corda.node.internal.Startable
    public void start() {
        synchronized (this) {
            if (!(!this.startedFlag)) {
                throw new IllegalArgumentException("Must not already be started".toString());
            }
            this.drainingModeWasChangedEvents.filter(new Func1<Pair<? extends Boolean, ? extends Boolean>, Boolean>() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$start$$inlined$synchronized$lambda$1
                public /* bridge */ /* synthetic */ Object call(Object obj) {
                    return Boolean.valueOf(call((Pair<Boolean, Boolean>) obj));
                }

                public final boolean call(Pair<Boolean, Boolean> pair) {
                    boolean switchedOn;
                    P2PMessagingConsumer p2PMessagingConsumer = P2PMessagingConsumer.this;
                    Intrinsics.checkExpressionValueIsNotNull(pair, "change");
                    switchedOn = p2PMessagingConsumer.switchedOn(pair);
                    return switchedOn;
                }
            }).doOnNext(new Action1<Pair<? extends Boolean, ? extends Boolean>>() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$start$$inlined$synchronized$lambda$2
                public final void call(Pair<Boolean, Boolean> pair) {
                    ReactiveArtemisConsumer reactiveArtemisConsumer;
                    ReactiveArtemisConsumer reactiveArtemisConsumer2;
                    reactiveArtemisConsumer = P2PMessagingConsumer.this.initialAndExistingConsumer;
                    reactiveArtemisConsumer2 = P2PMessagingConsumer.this.existingOnlyConsumer;
                    P2PMessagingClientKt.switchTo(reactiveArtemisConsumer, reactiveArtemisConsumer2);
                    P2PMessagingConsumer.this.scheduleDrainNotificationTimer();
                }
            }).subscribe();
            this.drainingModeWasChangedEvents.filter(new Func1<Pair<? extends Boolean, ? extends Boolean>, Boolean>() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$start$$inlined$synchronized$lambda$3
                public /* bridge */ /* synthetic */ Object call(Object obj) {
                    return Boolean.valueOf(call((Pair<Boolean, Boolean>) obj));
                }

                public final boolean call(Pair<Boolean, Boolean> pair) {
                    boolean switchedOff;
                    P2PMessagingConsumer p2PMessagingConsumer = P2PMessagingConsumer.this;
                    Intrinsics.checkExpressionValueIsNotNull(pair, "change");
                    switchedOff = p2PMessagingConsumer.switchedOff(pair);
                    return switchedOff;
                }
            }).doOnNext(new Action1<Pair<? extends Boolean, ? extends Boolean>>() { // from class: net.corda.node.services.messaging.P2PMessagingConsumer$start$$inlined$synchronized$lambda$4
                public final void call(Pair<Boolean, Boolean> pair) {
                    ReactiveArtemisConsumer reactiveArtemisConsumer;
                    ReactiveArtemisConsumer reactiveArtemisConsumer2;
                    Timer timer;
                    reactiveArtemisConsumer = P2PMessagingConsumer.this.existingOnlyConsumer;
                    reactiveArtemisConsumer2 = P2PMessagingConsumer.this.initialAndExistingConsumer;
                    P2PMessagingClientKt.switchTo(reactiveArtemisConsumer, reactiveArtemisConsumer2);
                    timer = P2PMessagingConsumer.this.notificationTimer;
                    if (timer != null) {
                        timer.cancel();
                    }
                }
            }).subscribe();
            Set<Subscription> set = this.subscriptions;
            Observable<ClientMessage> mo110getMessages = this.existingOnlyConsumer.mo110getMessages();
            final P2PMessagingConsumer$start$1$6 p2PMessagingConsumer$start$1$6 = new P2PMessagingConsumer$start$1$6(this.messages);
            set.add(mo110getMessages.doOnNext(new Action1() { // from class: net.corda.node.services.messaging.P2PMessagingClientKt$sam$i$rx_functions_Action1$0
                public final /* synthetic */ void call(Object obj) {
                    Intrinsics.checkExpressionValueIsNotNull(p2PMessagingConsumer$start$1$6.invoke(obj), "invoke(...)");
                }
            }).subscribe());
            Set<Subscription> set2 = this.subscriptions;
            Observable<ClientMessage> mo110getMessages2 = this.initialAndExistingConsumer.mo110getMessages();
            final P2PMessagingConsumer$start$1$7 p2PMessagingConsumer$start$1$7 = new P2PMessagingConsumer$start$1$7(this.messages);
            set2.add(mo110getMessages2.doOnNext(new Action1() { // from class: net.corda.node.services.messaging.P2PMessagingClientKt$sam$i$rx_functions_Action1$0
                public final /* synthetic */ void call(Object obj) {
                    Intrinsics.checkExpressionValueIsNotNull(p2PMessagingConsumer$start$1$7.invoke(obj), "invoke(...)");
                }
            }).subscribe());
            if (((Boolean) this.isDrainingModeOn.invoke()).booleanValue()) {
                this.existingOnlyConsumer.start();
                scheduleDrainNotificationTimer();
            } else {
                this.initialAndExistingConsumer.start();
            }
            this.startedFlag = true;
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Stoppable
    public void stop() {
        synchronized (this) {
            if (this.startedFlag) {
                this.existingOnlyConsumer.stop();
                this.initialAndExistingConsumer.stop();
                Iterator<T> it = this.subscriptions.iterator();
                while (it.hasNext()) {
                    ((Subscription) it.next()).unsubscribe();
                }
                this.subscriptions.clear();
                this.startedFlag = false;
            }
            this.messages.onCompleted();
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Startable
    public boolean getStarted() {
        return this.startedFlag;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean switchedOff(@NotNull Pair<Boolean, Boolean> pair) {
        return ((Boolean) pair.getFirst()).booleanValue() && !((Boolean) pair.getSecond()).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean switchedOn(@NotNull Pair<Boolean, Boolean> pair) {
        return !((Boolean) pair.getFirst()).booleanValue() && ((Boolean) pair.getSecond()).booleanValue();
    }

    public P2PMessagingConsumer(@NotNull Set<String> set, @NotNull Function0<? extends ClientSession> function0, @NotNull Function0<Boolean> function02, @NotNull Observable<Pair<Boolean, Boolean>> observable, @NotNull MetricRegistry metricRegistry) {
        Intrinsics.checkParameterIsNotNull(set, "queueNames");
        Intrinsics.checkParameterIsNotNull(function0, "createSession");
        Intrinsics.checkParameterIsNotNull(function02, "isDrainingModeOn");
        Intrinsics.checkParameterIsNotNull(observable, "drainingModeWasChangedEvents");
        Intrinsics.checkParameterIsNotNull(metricRegistry, "metricsRegistry");
        this.isDrainingModeOn = function02;
        this.drainingModeWasChangedEvents = observable;
        this.metricsRegistry = metricRegistry;
        PublishSubject<ClientMessage> create = PublishSubject.create();
        Intrinsics.checkExpressionValueIsNotNull(create, "PublishSubject.create<ClientMessage>()");
        this.messages = create;
        this.existingOnlyConsumer = ReactiveArtemisConsumer.Companion.multiplex(set, function0, initialSessionMessages);
        this.initialAndExistingConsumer = ReactiveArtemisConsumer.Companion.multiplex$default(ReactiveArtemisConsumer.Companion, set, function0, null, 4, null);
        this.subscriptions = new LinkedHashSet();
    }

    @Override // net.corda.node.internal.Stoppable, java.lang.AutoCloseable
    public void close() {
        LifecycleSupport.DefaultImpls.close(this);
    }
}
