package net.openhft.chronicle.engine.map.remote;

import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.EventConsumer;
import net.openhft.chronicle.engine.map.ObjectKVSSubscription;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ObjectKVSubscriptionHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/remote/RemoteKVSSubscription.class */
public class RemoteKVSSubscription<K, V> extends AbstractRemoteSubscription<MapEvent<K, V>> implements ObjectKVSSubscription<K, V>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    private final Class<K> kClass;
    private final Class<V> vClass;
    private RequestContext rc;

    public RemoteKVSSubscription(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        super((TcpChannelHub) asset.findView(TcpChannelHub.class), 0L, toUri(requestContext));
        this.kClass = requestContext.keyType();
        this.vClass = requestContext.valueType();
        this.rc = requestContext;
    }

    @NotNull
    private static String toUri(@NotNull RequestContext requestContext) {
        return "/" + requestContext.fullName() + "?view=subscription";
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerTopicSubscriber(@NotNull RequestContext requestContext, @NotNull final TopicSubscriber<K, V> topicSubscriber) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, "Remove KV Subscription registerTopicSubscriber") { // from class: net.openhft.chronicle.engine.map.remote.RemoteKVSSubscription.1
            public void onSubscribe(@NotNull WireOut wireOut) {
                RemoteKVSSubscription.this.subscribersToTid.put(topicSubscriber, Long.valueOf(tid()));
                wireOut.writeEventName(ObjectKVSubscriptionHandler.EventId.registerTopicSubscriber).marshallable(wireOut2 -> {
                    wireOut2.write(() -> {
                        return "keyType";
                    }).typeLiteral(RemoteKVSSubscription.this.kClass);
                    wireOut2.write(() -> {
                        return "valueType";
                    }).typeLiteral(RemoteKVSSubscription.this.vClass);
                });
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                TopicSubscriber topicSubscriber2 = topicSubscriber;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (CoreFields.reply.contentEquals(acquireStringBuilder)) {
                        readEventName.marshallable(wireIn2 -> {
                            RemoteKVSSubscription.this.onEvent(wireIn2.read(() -> {
                                return "topic";
                            }).object(RemoteKVSSubscription.this.kClass), wireIn2.read(() -> {
                                return "message";
                            }).object(RemoteKVSSubscription.this.vClass), topicSubscriber2);
                        });
                    } else if (ObjectKVSubscriptionHandler.EventId.onEndOfSubscription.contentEquals(acquireStringBuilder)) {
                        RemoteKVSSubscription.this.onEndOfSubscription();
                        RemoteKVSSubscription.this.hub.unsubscribe(tid());
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onEvent(K k, @Nullable V v, @NotNull TopicSubscriber<K, V> topicSubscriber) {
        try {
            topicSubscriber.onMessage(k, v);
        } catch (InvalidSubscriberException e) {
            unregisterTopicSubscriber(topicSubscriber);
        }
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber topicSubscriber) {
        Long l = this.subscribersToTid.get(topicSubscriber);
        if (l == null) {
            LOG.warn("There is no subscription to unsubscribe, was " + this.subscribersToTid.size() + " other subscriptions.");
            return;
        }
        this.hub.preventSubscribeUponReconnect(l.longValue());
        if (this.hub.isOpen()) {
            this.hub.lock(() -> {
                writeMetaDataForKnownTID(l.longValue());
                this.hub.outWire().writeDocument(false, wireOut -> {
                    wireOut.writeEventName(ObjectKVSubscriptionHandler.EventId.unregisterTopicSubscriber).text("");
                });
            });
        } else {
            this.hub.unsubscribe(l.longValue());
        }
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerKeySubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber<K> subscriber, @NotNull Filter<K> filter) {
        registerSubscriber0(requestContext, subscriber, filter);
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public boolean needsPrevious() {
        return true;
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void setKvStore(KeyValueStore<K, V> keyValueStore) {
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription, net.openhft.chronicle.engine.map.EventConsumer
    public void notifyEvent(MapEvent<K, V> mapEvent) {
        throw new UnsupportedOperationException("");
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public boolean hasSubscribers() {
        throw new UnsupportedOperationException("has subscribers, is only implemented on the server");
    }

    @Override // net.openhft.chronicle.engine.map.KVSSubscription
    public void registerDownstream(@NotNull EventConsumer<K, V> eventConsumer) {
        RequestContext type2 = this.rc.m12clone().type(MapEvent.class).type2(null);
        eventConsumer.getClass();
        registerSubscriber(type2, eventConsumer::notifyEvent, Filter.empty());
    }

    @Override // net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription, net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public /* bridge */ /* synthetic */ int entrySubscriberCount() {
        return super.entrySubscriberCount();
    }

    @Override // net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription, net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public /* bridge */ /* synthetic */ int keySubscriberCount() {
        return super.keySubscriberCount();
    }

    @Override // net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription, net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public /* bridge */ /* synthetic */ int topicSubscriberCount() {
        return super.topicSubscriberCount();
    }

    @Override // net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription, net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public /* bridge */ /* synthetic */ void unregisterSubscriber(@NotNull Subscriber subscriber) {
        super.unregisterSubscriber(subscriber);
    }

    @Override // net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription, net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public /* bridge */ /* synthetic */ void registerSubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber subscriber, @NotNull Filter filter) {
        super.registerSubscriber(requestContext, subscriber, filter);
    }
}
