package net.openhft.chronicle.engine.map.remote;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.SubscriptionHandler;
import net.openhft.chronicle.engine.tree.TopologicalEvent;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/remote/AbstractRemoteSubscription.class */
abstract class AbstractRemoteSubscription<E> extends AbstractStatelessClient implements SubscriptionCollection<E> {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    final Map<Object, Long> subscribersToTid;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractRemoteSubscription(@NotNull TcpChannelHub tcpChannelHub, long j, @NotNull String str) {
        super(tcpChannelHub, j, str);
        this.subscribersToTid = new ConcurrentHashMap();
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void registerSubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber<E> subscriber, @NotNull Filter<E> filter) {
        registerSubscriber0(requestContext, subscriber, filter);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        unregisterSubscriber0(subscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSubscriber0(@NotNull final RequestContext requestContext, @NotNull final Subscriber subscriber, @NotNull final Filter filter) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        Boolean bootstrap = requestContext.bootstrap();
        Boolean endSubscriptionAfterBootstrap = requestContext.endSubscriptionAfterBootstrap();
        String str = this.csp;
        if (bootstrap != null) {
            str = str + "&bootstrap=" + bootstrap;
        }
        if (endSubscriptionAfterBootstrap != null) {
            str = str + "&endSubscriptionAfterBootstrap=" + endSubscriptionAfterBootstrap;
        }
        if (requestContext.throttlePeriodMs() > 0) {
            str = str + "&throttlePeriodMs=" + requestContext.throttlePeriodMs();
        }
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, str, getClass().getSimpleName()) { // from class: net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription.1
            {
                AbstractRemoteSubscription.this.subscribersToTid.put(subscriber, Long.valueOf(tid()));
            }

            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(SubscriptionHandler.SubscriptionEventID.registerSubscriber).typeLiteral(ClassAliasPool.CLASS_ALIASES.nameFor(requestContext.elementType()));
                if (filter.isEmpty()) {
                    return;
                }
                wireOut.writeEventName(() -> {
                    return "filter";
                }).object(filter);
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                Subscriber subscriber2 = subscriber;
                RequestContext requestContext2 = requestContext;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(acquireStringBuilder)) {
                        subscriber2.onEndOfSubscription();
                        AbstractRemoteSubscription.this.subscribersToTid.remove(this);
                        AbstractRemoteSubscription.this.hub.unsubscribe(tid());
                    } else if (CoreFields.reply.contentEquals(acquireStringBuilder)) {
                        Class elementType = requestContext2.elementType();
                        AbstractRemoteSubscription.this.onEvent((MapEvent.class.isAssignableFrom(elementType) || TopologicalEvent.class.isAssignableFrom(elementType)) ? readEventName.typedMarshallable() : readEventName.object(requestContext2.elementType()), subscriber2);
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onEvent(@Nullable Object obj, @NotNull Subscriber subscriber) {
        if (obj != null) {
            try {
                subscriber.onMessage(obj);
            } catch (InvalidSubscriberException e) {
                unregisterSubscriber(subscriber);
            }
        }
    }

    void unregisterSubscriber0(@NotNull Subscriber subscriber) {
        Long l = this.subscribersToTid.get(subscriber);
        if (l == null) {
            LOG.warn("There is no subscription to unsubscribe");
            return;
        }
        this.hub.preventSubscribeUponReconnect(l.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(l.longValue());
        } else {
            if (this.hub.lock(() -> {
                writeMetaDataForKnownTID(l.longValue());
                this.hub.outWire().writeDocument(false, wireOut -> {
                    wireOut.writeEventName(SubscriptionHandler.SubscriptionEventID.unregisterSubscriber).text("");
                });
            })) {
                return;
            }
            this.hub.unsubscribe(l.longValue());
        }
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int topicSubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.topicSubscriberCount);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int keySubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.keySubscriberCount);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int entrySubscriberCount() {
        return proxyReturnInt(SubscriptionHandler.SubscriptionEventID.entrySubscriberCount);
    }
}
