package net.corda.node.internal.artemis;

import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import net.corda.node.internal.artemis.ReactiveArtemisConsumer;
import net.corda.node.services.config.shell.ShellSafetyConfigKt;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import rx.subjects.PublishSubject;

/* compiled from: ReactiveArtemisConsumer.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��F\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u000b\n\u0002\b\u0005\n\u0002\u0010#\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u0002\n\u0002\b\u0004\b\u0002\u0018��2\u00020\u0001B+\u0012\f\u0010\u0002\u001a\b\u0012\u0004\u0012\u00020\u00040\u0003\u0012\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006\u0012\b\u0010\b\u001a\u0004\u0018\u00010\u0004¢\u0006\u0002\u0010\tJ\b\u0010\u001c\u001a\u00020\u001dH\u0016J\b\u0010\u001e\u001a\u00020\u001dH\u0016J\b\u0010\u001f\u001a\u00020\u001dH\u0016J\b\u0010 \u001a\u00020\u001dH\u0016R\u001a\u0010\n\u001a\u00020\u000bX\u0096\u000e¢\u0006\u000e\n��\u001a\u0004\b\f\u0010\r\"\u0004\b\u000e\u0010\u000fR\u0014\u0010\u0010\u001a\b\u0012\u0004\u0012\u00020\u00120\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\b\u001a\u0004\u0018\u00010\u0004X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u00150\u0014X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R\u0014\u0010\u0002\u001a\b\u0012\u0004\u0012\u00020\u00040\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0018\u001a\b\u0012\u0004\u0012\u00020\u00070\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0019\u001a\u00020\u000b8VX\u0096\u0004¢\u0006\u0006\u001a\u0004\b\u001a\u0010\rR\u000e\u0010\u001b\u001a\u00020\u000bX\u0082\u000e¢\u0006\u0002\n��¨\u0006!"}, d2 = {"Lnet/corda/node/internal/artemis/MultiplexingReactiveArtemisConsumer;", "Lnet/corda/node/internal/artemis/ReactiveArtemisConsumer;", "queueNames", ShellSafetyConfigKt.SAFE_INTERNAL_SHELL_PERMISSION, ShellSafetyConfigKt.SAFE_INTERNAL_SHELL_PERMISSION, "createSession", "Lkotlin/Function0;", "Lorg/apache/activemq/artemis/api/core/client/ClientSession;", "filter", "(Ljava/util/Set;Lkotlin/jvm/functions/Function0;Ljava/lang/String;)V", "connected", ShellSafetyConfigKt.SAFE_INTERNAL_SHELL_PERMISSION, "getConnected", "()Z", "setConnected", "(Z)V", "consumers", ShellSafetyConfigKt.SAFE_INTERNAL_SHELL_PERMISSION, "Lorg/apache/activemq/artemis/api/core/client/ClientConsumer;", "messages", "Lrx/subjects/PublishSubject;", "Lorg/apache/activemq/artemis/api/core/client/ClientMessage;", "getMessages", "()Lrx/subjects/PublishSubject;", "sessions", "started", "getStarted", "startedFlag", "connect", ShellSafetyConfigKt.SAFE_INTERNAL_SHELL_PERMISSION, "disconnect", "start", "stop", "node"})
/* loaded from: input_file:net/corda/node/internal/artemis/MultiplexingReactiveArtemisConsumer.class */
final class MultiplexingReactiveArtemisConsumer implements ReactiveArtemisConsumer {
    private boolean startedFlag;
    private boolean connected;

    @NotNull
    private final PublishSubject<ClientMessage> messages;
    private final Set<ClientConsumer> consumers;
    private final Set<ClientSession> sessions;
    private final Set<String> queueNames;
    private final Function0<ClientSession> createSession;
    private final String filter;

    @Override // net.corda.node.internal.Connectable
    public boolean getConnected() {
        return this.connected;
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }

    @Override // net.corda.node.internal.artemis.ReactiveArtemisConsumer
    @NotNull
    /* renamed from: getMessages, reason: merged with bridge method [inline-methods] */
    public PublishSubject<ClientMessage> mo115getMessages() {
        return this.messages;
    }

    @Override // net.corda.node.internal.Startable
    public void start() {
        synchronized (this) {
            if (!(!this.startedFlag)) {
                throw new IllegalArgumentException("Must not be started".toString());
            }
            connect();
            this.startedFlag = true;
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Stoppable
    public void stop() {
        synchronized (this) {
            if (this.startedFlag) {
                disconnect();
                this.startedFlag = false;
            }
            mo115getMessages().onCompleted();
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Connectable
    public void connect() {
        ClientConsumer createConsumer;
        synchronized (this) {
            if (!(!getConnected())) {
                throw new IllegalArgumentException("Must not be connected".toString());
            }
            for (String str : this.queueNames) {
                ClientSession clientSession = (ClientSession) this.createSession.invoke();
                clientSession.start();
                Set<ClientConsumer> set = this.consumers;
                String str2 = this.filter;
                if (str2 != null) {
                    createConsumer = clientSession.createConsumer(str, str2);
                    if (createConsumer != null) {
                        set.add(createConsumer);
                        this.sessions.add(clientSession);
                    }
                }
                createConsumer = clientSession.createConsumer(str);
                set.add(createConsumer);
                this.sessions.add(clientSession);
            }
            Iterator<T> it = this.consumers.iterator();
            while (it.hasNext()) {
                ((ClientConsumer) it.next()).setMessageHandler(new MessageHandler() { // from class: net.corda.node.internal.artemis.MultiplexingReactiveArtemisConsumer$connect$$inlined$synchronized$lambda$1
                    public final void onMessage(ClientMessage clientMessage) {
                        MultiplexingReactiveArtemisConsumer.this.mo115getMessages().onNext(clientMessage);
                    }
                });
            }
            setConnected(true);
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Connectable
    public void disconnect() {
        synchronized (this) {
            if (getConnected()) {
                Iterator<T> it = this.consumers.iterator();
                while (it.hasNext()) {
                    ((ClientConsumer) it.next()).close();
                }
                Iterator<T> it2 = this.sessions.iterator();
                while (it2.hasNext()) {
                    ((ClientSession) it2.next()).close();
                }
                this.consumers.clear();
                this.sessions.clear();
                setConnected(false);
            }
            Unit unit = Unit.INSTANCE;
        }
    }

    @Override // net.corda.node.internal.Startable
    public boolean getStarted() {
        return this.startedFlag;
    }

    public MultiplexingReactiveArtemisConsumer(@NotNull Set<String> set, @NotNull Function0<? extends ClientSession> function0, @Nullable String str) {
        Intrinsics.checkParameterIsNotNull(set, "queueNames");
        Intrinsics.checkParameterIsNotNull(function0, "createSession");
        this.queueNames = set;
        this.createSession = function0;
        this.filter = str;
        PublishSubject<ClientMessage> create = PublishSubject.create();
        Intrinsics.checkExpressionValueIsNotNull(create, "PublishSubject.create<ClientMessage>()");
        this.messages = create;
        this.consumers = new LinkedHashSet();
        this.sessions = new LinkedHashSet();
    }

    @Override // net.corda.node.internal.Stoppable, java.lang.AutoCloseable
    public void close() {
        ReactiveArtemisConsumer.DefaultImpls.close(this);
    }
}
