package io.axoniq.axonhub.client.query;

import io.axoniq.axonhub.ErrorMessage;
import io.axoniq.axonhub.QueryRequest;
import io.axoniq.axonhub.QueryResponse;
import io.axoniq.axonhub.QuerySubscription;
import io.axoniq.axonhub.client.AxonHubConfiguration;
import io.axoniq.axonhub.client.DispatchInterceptors;
import io.axoniq.axonhub.client.ErrorCode;
import io.axoniq.axonhub.client.PlatformConnectionManager;
import io.axoniq.axonhub.client.command.AxonHubRegistration;
import io.axoniq.axonhub.client.util.ContextAddingInterceptor;
import io.axoniq.axonhub.client.util.ExceptionSerializer;
import io.axoniq.axonhub.client.util.FlowControllingStreamObserver;
import io.axoniq.axonhub.client.util.ProcessingInstructionHelper;
import io.axoniq.axonhub.client.util.TokenAddingInterceptor;
import io.axoniq.axonhub.grpc.QueryComplete;
import io.axoniq.axonhub.grpc.QueryProviderInbound;
import io.axoniq.axonhub.grpc.QueryProviderOutbound;
import io.axoniq.axonhub.grpc.QueryServiceGrpc;
import io.grpc.ClientInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonhub/client/query/AxonHubQueryBus.class */
public class AxonHubQueryBus implements QueryBus {
    private final AxonHubConfiguration configuration;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final PlatformConnectionManager platformConnectionManager;
    private final ClientInterceptor[] interceptors;
    private final Logger logger = LoggerFactory.getLogger(AxonHubQueryBus.class);
    private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors = new DispatchInterceptors<>();
    private final Map<QueryProviderInbound.RequestCase, Collection<Consumer<QueryProviderInbound>>> queryHandlers = new EnumMap(QueryProviderInbound.RequestCase.class);
    private final QueryProvider queryProvider = new QueryProvider();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/axoniq/axonhub/client/query/AxonHubQueryBus$QueryProvider.class */
    public class QueryProvider {
        private final ConcurrentMap<QueryDefinition, Set<MessageHandler<? super QueryMessage<?, ?>>>> subscribedQueries = new ConcurrentHashMap();
        private final PriorityBlockingQueue<QueryRequest> queryQueue = new PriorityBlockingQueue<>(1000, Comparator.comparingLong(queryRequest -> {
            return -ProcessingInstructionHelper.priority(queryRequest.getProcessingInstructionsList());
        }));
        private final ExecutorService executor;
        private StreamObserver<QueryProviderOutbound> outboundStreamObserver;
        private volatile boolean subscribing;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/axoniq/axonhub/client/query/AxonHubQueryBus$QueryProvider$QueryDefinition.class */
        public class QueryDefinition {
            private final String queryName;
            private final String responseName;
            private final String componentName;

            QueryDefinition(String str, String str2, String str3) {
                this.queryName = str;
                this.responseName = str2;
                this.componentName = str3;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                QueryDefinition queryDefinition = (QueryDefinition) obj;
                return Objects.equals(this.queryName, queryDefinition.queryName) && Objects.equals(this.responseName, queryDefinition.responseName) && Objects.equals(this.componentName, queryDefinition.componentName);
            }

            public int hashCode() {
                return Objects.hash(this.queryName, this.responseName, this.componentName);
            }
        }

        QueryProvider() {
            this.executor = Executors.newFixedThreadPool(AxonHubQueryBus.this.configuration.getQueryThreads());
            IntStream.range(0, AxonHubQueryBus.this.configuration.getQueryThreads()).forEach(i -> {
                this.executor.submit(this::queryExecutor);
            });
        }

        private void queryExecutor() {
            AxonHubQueryBus.this.logger.debug("Starting Query Executor");
            while (true) {
                try {
                    QueryRequest poll = this.queryQueue.poll(10L, TimeUnit.SECONDS);
                    if (poll != null) {
                        AxonHubQueryBus.this.logger.debug("Received query: {}", poll);
                        processQuery(poll);
                    }
                } catch (InterruptedException e) {
                    AxonHubQueryBus.this.logger.warn("Interrupted queryExecutor", e);
                    return;
                }
            }
        }

        private void processQuery(QueryRequest queryRequest) {
            String messageIdentifier = queryRequest.getMessageIdentifier();
            try {
                if (ProcessingInstructionHelper.numberOfResults(queryRequest.getProcessingInstructionsList()) == 1) {
                    this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(AxonHubQueryBus.this.serializer.serializeResponse((QueryResponseMessage) AxonHubQueryBus.this.localSegment.query(AxonHubQueryBus.this.serializer.deserializeRequest(queryRequest)).get(), messageIdentifier)).m1904build());
                } else {
                    AxonHubQueryBus.this.localSegment.scatterGather(AxonHubQueryBus.this.serializer.deserializeRequest(queryRequest), 0L, TimeUnit.SECONDS).forEach(queryResponseMessage -> {
                        this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(AxonHubQueryBus.this.serializer.serializeResponse(queryResponseMessage, messageIdentifier)).m1904build());
                    });
                }
                this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryComplete(QueryComplete.newBuilder().setMessageId(UUID.randomUUID().toString()).setRequestId(messageIdentifier)).m1904build());
            } catch (Exception e) {
                AxonHubQueryBus.this.logger.warn("Received error from localSegment: {}", e.getMessage(), e);
                this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(QueryResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(messageIdentifier).setMessage(ExceptionSerializer.serialize(AxonHubQueryBus.this.configuration.getClientName(), e)).setErrorCode(ErrorCode.resolve(e).errorCode()).m1188build()).m1904build());
            }
        }

        public <R> Registration subscribe(String str, Type type, String str2, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
            this.subscribing = true;
            Set<MessageHandler<? super QueryMessage<?, ?>>> computeIfAbsent = this.subscribedQueries.computeIfAbsent(new QueryDefinition(str, type.getTypeName(), str2), queryDefinition -> {
                return new CopyOnWriteArraySet();
            });
            computeIfAbsent.add(messageHandler);
            try {
                try {
                    getSubscriberObserver().onNext(QueryProviderOutbound.newBuilder().setSubscribe(QuerySubscription.newBuilder().setMessageId(UUID.randomUUID().toString()).setClientName(AxonHubQueryBus.this.configuration.getClientName()).setComponentName(str2).setQuery(str).setResultName(type.getTypeName()).setNrOfHandlers(computeIfAbsent.size()).m1236build()).m1904build());
                    this.subscribing = false;
                } catch (Exception e) {
                    AxonHubQueryBus.this.logger.warn("Subscribe failed - {}", e.getMessage());
                    this.subscribing = false;
                }
                return AxonHubQueryBus.this.localSegment.subscribe(str, type, messageHandler);
            } catch (Throwable th) {
                this.subscribing = false;
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized StreamObserver<QueryProviderOutbound> getSubscriberObserver() {
            if (this.outboundStreamObserver == null) {
                AxonHubQueryBus.this.logger.info("Create new subscriber");
                this.outboundStreamObserver = new FlowControllingStreamObserver(AxonHubQueryBus.this.platformConnectionManager.getQueryStream(new StreamObserver<QueryProviderInbound>() { // from class: io.axoniq.axonhub.client.query.AxonHubQueryBus.QueryProvider.1
                    public void onNext(QueryProviderInbound queryProviderInbound) {
                        QueryProviderInbound.RequestCase requestCase = queryProviderInbound.getRequestCase();
                        ((Collection) AxonHubQueryBus.this.queryHandlers.getOrDefault(requestCase, Collections.emptySet())).forEach(consumer -> {
                            consumer.accept(queryProviderInbound);
                        });
                        switch (requestCase) {
                            case CONFIRMATION:
                            default:
                                return;
                            case QUERY:
                                QueryProvider.this.queryQueue.add(queryProviderInbound.getQuery());
                                return;
                        }
                    }

                    public void onError(Throwable th) {
                        QueryProvider.this.outboundStreamObserver = null;
                    }

                    public void onCompleted() {
                        QueryProvider.this.outboundStreamObserver = null;
                    }
                }, AxonHubQueryBus.this.interceptors), AxonHubQueryBus.this.configuration, flowControl -> {
                    return QueryProviderOutbound.newBuilder().setFlowControl(flowControl).m1904build();
                }, queryProviderOutbound -> {
                    return queryProviderOutbound.getRequestCase().equals(QueryProviderOutbound.RequestCase.QUERYRESPONSE);
                }).sendInitialPermits();
            }
            return this.outboundStreamObserver;
        }

        public void unsubscribe(String str, Type type, String str2) {
            QueryDefinition queryDefinition = new QueryDefinition(str, type.getTypeName(), str2);
            this.subscribedQueries.remove(queryDefinition);
            try {
                getSubscriberObserver().onNext(QueryProviderOutbound.newBuilder().setUnsubscribe(subscriptionBuilder(queryDefinition, 1)).m1904build());
            } catch (Exception e) {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unsubscribeAll() {
            this.subscribedQueries.forEach((queryDefinition, set) -> {
                try {
                    getSubscriberObserver().onNext(QueryProviderOutbound.newBuilder().setUnsubscribe(subscriptionBuilder(queryDefinition, 1)).m1904build());
                } catch (Exception e) {
                }
            });
            this.outboundStreamObserver = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resubscribe() {
            if (this.subscribedQueries.isEmpty() || this.subscribing) {
                return;
            }
            try {
                StreamObserver<QueryProviderOutbound> subscriberObserver = getSubscriberObserver();
                this.subscribedQueries.forEach((queryDefinition, set) -> {
                    subscriberObserver.onNext(QueryProviderOutbound.newBuilder().setSubscribe(subscriptionBuilder(queryDefinition, set.size())).m1904build());
                });
            } catch (Exception e) {
                AxonHubQueryBus.this.logger.warn("Error while resubscribing - {}", e.getMessage());
            }
        }

        public void disconnect() {
            if (this.outboundStreamObserver != null) {
                this.outboundStreamObserver.onCompleted();
            }
        }

        private QuerySubscription.Builder subscriptionBuilder(QueryDefinition queryDefinition, int i) {
            return QuerySubscription.newBuilder().setClientName(AxonHubQueryBus.this.configuration.getClientName()).setMessageId(UUID.randomUUID().toString()).setComponentName(queryDefinition.componentName).setQuery(queryDefinition.queryName).setNrOfHandlers(i).setResultName(queryDefinition.responseName);
        }
    }

    public AxonHubQueryBus(PlatformConnectionManager platformConnectionManager, AxonHubConfiguration axonHubConfiguration, QueryBus queryBus, Serializer serializer, Serializer serializer2, QueryPriorityCalculator queryPriorityCalculator) {
        this.configuration = axonHubConfiguration;
        this.localSegment = queryBus;
        this.serializer = new QuerySerializer(serializer, serializer2, axonHubConfiguration);
        this.priorityCalculator = queryPriorityCalculator;
        this.platformConnectionManager = platformConnectionManager;
        PlatformConnectionManager platformConnectionManager2 = this.platformConnectionManager;
        QueryProvider queryProvider = this.queryProvider;
        queryProvider.getClass();
        platformConnectionManager2.addReconnectListener(() -> {
            queryProvider.resubscribe();
        });
        PlatformConnectionManager platformConnectionManager3 = this.platformConnectionManager;
        QueryProvider queryProvider2 = this.queryProvider;
        queryProvider2.getClass();
        platformConnectionManager3.addDisconnectListener(() -> {
            queryProvider2.unsubscribeAll();
        });
        this.interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(axonHubConfiguration.getToken()), new ContextAddingInterceptor(axonHubConfiguration.getContext())};
    }

    public <R> Registration subscribe(String str, Type type, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        return new AxonHubRegistration(this.queryProvider.subscribe(str, type, this.configuration.getComponentName(), messageHandler), () -> {
            this.queryProvider.unsubscribe(str, type, this.configuration.getComponentName());
        });
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
        QueryMessage<Q, R> intercept = this.dispatchInterceptors.intercept(queryMessage);
        final CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>();
        queryServiceStub().query(this.serializer.serializeRequest(intercept, 1, TimeUnit.HOURS.toMillis(1L), this.priorityCalculator.determinePriority(intercept)), new StreamObserver<QueryResponse>() { // from class: io.axoniq.axonhub.client.query.AxonHubQueryBus.1
            public void onNext(QueryResponse queryResponse) {
                AxonHubQueryBus.this.logger.debug("Received response: {}", queryResponse);
                if (queryResponse.hasMessage()) {
                    completableFuture.completeExceptionally(new RemoteQueryException(queryResponse.getErrorCode(), queryResponse.getMessage()));
                } else {
                    completableFuture.complete(AxonHubQueryBus.this.serializer.deserializeResponse(queryResponse));
                }
            }

            public void onError(Throwable th) {
                AxonHubQueryBus.this.logger.warn("Received error while waiting for first response: {}", th.getMessage(), th);
                completableFuture.completeExceptionally(th);
            }

            public void onCompleted() {
                if (completableFuture.isDone()) {
                    return;
                }
                completableFuture.completeExceptionally(new RemoteQueryException(ErrorCode.OTHER.errorCode(), ErrorMessage.newBuilder().setMessage("No result from query executor").m1043build()));
            }
        });
        return completableFuture;
    }

    public QueryServiceGrpc.QueryServiceStub queryServiceStub() {
        return (QueryServiceGrpc.QueryServiceStub) QueryServiceGrpc.newStub(this.platformConnectionManager.getChannel()).withInterceptors(this.interceptors);
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long j, TimeUnit timeUnit) {
        QueryMessage<Q, R> intercept = this.dispatchInterceptors.intercept(queryMessage);
        final QueueBackedSpliterator queueBackedSpliterator = new QueueBackedSpliterator(j, timeUnit);
        ((QueryServiceGrpc.QueryServiceStub) queryServiceStub().withDeadlineAfter(j, timeUnit)).query(this.serializer.serializeRequest(intercept, -1, timeUnit.toMillis(j), this.priorityCalculator.determinePriority(intercept)), new StreamObserver<QueryResponse>() { // from class: io.axoniq.axonhub.client.query.AxonHubQueryBus.2
            public void onNext(QueryResponse queryResponse) {
                AxonHubQueryBus.this.logger.debug("Received response: {}", queryResponse);
                if (queryResponse.hasMessage()) {
                    AxonHubQueryBus.this.logger.warn("Received exception: {}", queryResponse.getMessage());
                } else {
                    queueBackedSpliterator.put(AxonHubQueryBus.this.serializer.deserializeResponse(queryResponse));
                }
            }

            public void onError(Throwable th) {
                if (!AxonHubQueryBus.this.isDeadlineExceeded(th)) {
                    AxonHubQueryBus.this.logger.warn("Received error while waiting for responses: {}", th.getMessage(), th);
                }
                queueBackedSpliterator.cancel(th);
            }

            public void onCompleted() {
                queueBackedSpliterator.cancel(null);
            }
        });
        return StreamSupport.stream(queueBackedSpliterator, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isDeadlineExceeded(Throwable th) {
        return (th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.DEADLINE_EXCEEDED);
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?, ?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> messageDispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(messageDispatchInterceptor);
    }

    public void disconnect() {
        this.queryProvider.disconnect();
    }

    public void publish(QueryProviderOutbound queryProviderOutbound) {
        this.queryProvider.getSubscriberObserver().onNext(queryProviderOutbound);
    }

    public void on(QueryProviderInbound.RequestCase requestCase, Consumer<QueryProviderInbound> consumer) {
        this.queryHandlers.computeIfAbsent(requestCase, requestCase2 -> {
            return new CopyOnWriteArraySet();
        }).add(consumer);
    }
}
