package io.axoniq.axonhub.client.query.subscription;

import io.axoniq.axonhub.SubscriptionQuery;
import io.axoniq.axonhub.client.AxonHubConfiguration;
import io.axoniq.axonhub.client.PlatformConnectionManager;
import io.axoniq.axonhub.client.query.AxonHubQueryBus;
import io.axoniq.axonhub.client.query.QueryPriorityCalculator;
import io.axoniq.axonhub.grpc.QueryProviderInbound;
import io.axoniq.axonhub.grpc.QueryServiceGrpc;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonhub/client/query/subscription/EnhancedAxonHubQueryBus.class */
public class EnhancedAxonHubQueryBus implements QueryBus, QueryUpdateEmitter {
    private final AxonHubConfiguration configuration;
    private final AxonHubQueryBus axonHubQueryBus;
    private final SubscriptionMessageSerializer serializer;
    private final QueryUpdateEmitter updateEmitter;
    private final Logger logger = LoggerFactory.getLogger(EnhancedAxonHubQueryBus.class);
    private final Collection<String> subscriptions = new CopyOnWriteArraySet();

    public EnhancedAxonHubQueryBus(PlatformConnectionManager platformConnectionManager, AxonHubConfiguration axonHubConfiguration, QueryBus queryBus, QueryUpdateEmitter queryUpdateEmitter, Serializer serializer, Serializer serializer2, QueryPriorityCalculator queryPriorityCalculator) {
        this.configuration = axonHubConfiguration;
        this.axonHubQueryBus = new AxonHubQueryBus(platformConnectionManager, axonHubConfiguration, queryBus, serializer, serializer2, queryPriorityCalculator);
        this.serializer = new SubscriptionMessageSerializer(axonHubConfiguration, serializer, serializer2);
        platformConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
        platformConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
        AxonHubQueryBus axonHubQueryBus = this.axonHubQueryBus;
        axonHubQueryBus.getClass();
        SubscriptionQueryRequestTarget subscriptionQueryRequestTarget = new SubscriptionQueryRequestTarget(queryBus, axonHubQueryBus::publish, this.serializer);
        AxonHubQueryBus axonHubQueryBus2 = this.axonHubQueryBus;
        QueryProviderInbound.RequestCase requestCase = QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST;
        subscriptionQueryRequestTarget.getClass();
        axonHubQueryBus2.on(requestCase, subscriptionQueryRequestTarget::onSubscriptionQueryRequest);
        subscriptionQueryRequestTarget.getClass();
        platformConnectionManager.addDisconnectListener(subscriptionQueryRequestTarget::onApplicationDisconnected);
        this.updateEmitter = queryUpdateEmitter;
    }

    public <R> Registration subscribe(String str, Type type, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        return this.axonHubQueryBus.subscribe(str, type, messageHandler);
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
        return this.axonHubQueryBus.query(queryMessage);
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long j, TimeUnit timeUnit) {
        return this.axonHubQueryBus.scatterGather(queryMessage, j, timeUnit);
    }

    public <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> predicate, SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage) {
        this.updateEmitter.emit(predicate, subscriptionQueryUpdateMessage);
    }

    public void complete(Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate) {
        this.updateEmitter.complete(predicate);
    }

    public void completeExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate, Throwable th) {
        this.updateEmitter.completeExceptionally(predicate, th);
    }

    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i) {
        String identifier = subscriptionQueryMessage.getIdentifier();
        if (this.subscriptions.contains(identifier)) {
            String str = "Already exists a subscription query with the same subscriptionId: " + identifier;
            this.logger.warn(str);
            throw new IllegalArgumentException(str);
        }
        this.logger.debug("Subscription Query requested with subscriptionId " + identifier);
        this.subscriptions.add(identifier);
        QueryServiceGrpc.QueryServiceStub queryServiceStub = this.axonHubQueryBus.queryServiceStub();
        SubscriptionQuery serialize = this.serializer.serialize(subscriptionQueryMessage);
        queryServiceStub.getClass();
        return new DeserializedResult(new AxonHubSubscriptionQueryResult(serialize, queryServiceStub::subscription, this.configuration, subscriptionQueryBackpressure, i, () -> {
            this.subscriptions.remove(identifier);
        }).get(), this.serializer);
    }

    private Runnable interceptReconnectRequest(Runnable runnable) {
        return this.subscriptions.isEmpty() ? runnable : () -> {
            this.logger.info("Reconnect refused because there are active subscription queries.");
        };
    }

    private void onApplicationDisconnected() {
        this.subscriptions.clear();
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> messageDispatchInterceptor) {
        return this.axonHubQueryBus.registerDispatchInterceptor(messageDispatchInterceptor);
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?, ?>> messageHandlerInterceptor) {
        return this.axonHubQueryBus.registerHandlerInterceptor(messageHandlerInterceptor);
    }
}
