package io.streamthoughts.azkarra.api.query;

import io.streamthoughts.azkarra.api.monad.Either;
import io.streamthoughts.azkarra.api.query.internal.PreparedQuery;
import io.streamthoughts.azkarra.api.query.result.ErrorResultSet;
import io.streamthoughts.azkarra.api.query.result.QueryError;
import io.streamthoughts.azkarra.api.query.result.QueryResult;
import io.streamthoughts.azkarra.api.query.result.QueryResultBuilder;
import io.streamthoughts.azkarra.api.query.result.QueryStatus;
import io.streamthoughts.azkarra.api.query.result.SuccessResultSet;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.StreamsServerInfo;
import io.streamthoughts.azkarra.api.time.Time;
import io.streamthoughts.azkarra.api.util.FutureCollectors;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/query/DistributedQuery.class */
public class DistributedQuery<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedQuery.class);
    private final RemoteQueryClient remoteQueryClient;
    private final PreparedQuery<K, V> query;

    public DistributedQuery(RemoteQueryClient remoteQueryClient, PreparedQuery<K, V> preparedQuery) {
        Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null");
        Objects.requireNonNull(preparedQuery, "query cannot be null");
        this.remoteQueryClient = remoteQueryClient;
        this.query = preparedQuery;
    }

    public QueryResult<K, V> query(KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
        Objects.requireNonNull(kafkaStreamsContainer, "streams cannot be null");
        Objects.requireNonNull(queried, "options cannot be null");
        long milliseconds = Time.SYSTEM.milliseconds();
        String str = (String) kafkaStreamsContainer.getLocalServerInfo().map((v0) -> {
            return v0.hostAndPort();
        }).orElse("N/A");
        return this.query.isKeyedQuery() ? querySingleHostStateStore(kafkaStreamsContainer, str, queried, milliseconds) : queryMultiHostStateStore(kafkaStreamsContainer, str, queried, milliseconds);
    }

    private QueryResult<K, V> queryMultiHostStateStore(KafkaStreamsContainer kafkaStreamsContainer, String str, Queried queried, long j) {
        List list = null;
        LinkedList linkedList = new LinkedList();
        Collection<StreamsServerInfo> allMetadataForStore = kafkaStreamsContainer.getAllMetadataForStore(this.query.storeName());
        if (queried.remoteAccessAllowed()) {
            list = (List) allMetadataForStore.stream().filter(Predicate.not((v0) -> {
                return v0.isLocal();
            })).map(streamsServerInfo -> {
                String hostAndPort = streamsServerInfo.hostAndPort();
                return this.remoteQueryClient.query(streamsServerInfo, this.query, queried.withRemoteAccessAllowed(false)).exceptionally(th -> {
                    return buildInternalErrorResult(str, hostAndPort, th, j);
                });
            }).collect(Collectors.toList());
        }
        allMetadataForStore.stream().filter((v0) -> {
            return v0.isLocal();
        }).findFirst().ifPresent(streamsServerInfo2 -> {
            linkedList.add(executeLocallyAndGet(str, kafkaStreamsContainer, queried));
        });
        if (list != null) {
            linkedList.addAll(waitRemoteThenGet(list));
        }
        return buildQueryResult(str, linkedList, j);
    }

    private QueryResult<K, V> querySingleHostStateStore(KafkaStreamsContainer kafkaStreamsContainer, String str, Queried queried, long j) {
        QueryResult<K, V> queryResult = null;
        Serializer<K> keySerializer = this.query.keySerializer();
        if (keySerializer == null) {
            keySerializer = kafkaStreamsContainer.getDefaultKeySerde().orElse(Serdes.String()).serializer();
        }
        StreamsServerInfo metadataForStoreAndKey = kafkaStreamsContainer.getMetadataForStoreAndKey(this.query.storeName(), this.query.key(), keySerializer);
        if (metadataForStoreAndKey.isLocal()) {
            queryResult = buildQueryResult(str, Collections.singletonList(executeLocallyAndGet(str, kafkaStreamsContainer, queried)), j);
        } else if (queried.remoteAccessAllowed()) {
            String hostAndPort = metadataForStoreAndKey.hostAndPort();
            try {
                queryResult = this.remoteQueryClient.query(metadataForStoreAndKey, this.query, queried).exceptionally(th -> {
                    return buildInternalErrorResult(str, hostAndPort, th, j);
                }).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                queryResult = buildInternalErrorResult(str, hostAndPort, e, j);
            } catch (ExecutionException e2) {
            }
        }
        if (queryResult == null) {
            queryResult = buildQueryResult(str, Collections.emptyList(), j);
        }
        return queryResult;
    }

    private QueryResult<K, V> buildQueryResult(String str, List<Either<SuccessResultSet<K, V>, ErrorResultSet>> list, long j) {
        List<ErrorResultSet> list2 = (List) list.stream().filter((v0) -> {
            return v0.isRight();
        }).map(either -> {
            return (ErrorResultSet) either.right().get();
        }).collect(Collectors.toList());
        List<SuccessResultSet<K, V>> list3 = (List) list.stream().filter((v0) -> {
            return v0.isLeft();
        }).map(either2 -> {
            return (SuccessResultSet) either2.left().get();
        }).collect(Collectors.toList());
        return QueryResultBuilder.newBuilder().setServer(str).setTook(Time.SYSTEM.milliseconds() - j).setStatus(computeStatus(list2, list3)).setFailedResultSet(list2).setSuccessResultSet(list3).build();
    }

    private QueryResult<K, V> buildInternalErrorResult(String str, String str2, Throwable th, long j) {
        return QueryResultBuilder.newBuilder().setServer(str).setTook(Time.SYSTEM.milliseconds() - j).setStatus(QueryStatus.ERROR).setFailedResultSet(new ErrorResultSet(str2, true, QueryError.of(th))).build();
    }

    private QueryStatus computeStatus(List<ErrorResultSet> list, List<SuccessResultSet<K, V>> list2) {
        return (list.isEmpty() || list2.isEmpty()) ? !list.isEmpty() ? QueryStatus.ERROR : !list2.isEmpty() ? QueryStatus.SUCCESS : QueryStatus.NO_RESULT : QueryStatus.PARTIAL;
    }

    private static <K, V> List<Either<SuccessResultSet<K, V>, ErrorResultSet>> waitRemoteThenGet(List<CompletableFuture<QueryResult<K, V>>> list) {
        try {
            return (List) ((CompletableFuture) list.stream().collect(FutureCollectors.allOf())).handle((list2, th) -> {
                return (List) list2.stream().map(queryResult -> {
                    return queryResult.getResult();
                }).flatMap(globalResultSet -> {
                    return globalResultSet.unwrap().stream();
                }).collect(Collectors.toList());
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Unexpected error happens while waiting for remote query results", e);
            return Collections.emptyList();
        }
    }

    private Either<SuccessResultSet<K, V>, ErrorResultSet> executeLocallyAndGet(String str, KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
        return this.query.execute(kafkaStreamsContainer, queried).left().map(list -> {
            return new SuccessResultSet(str, false, list);
        }).right().map(list2 -> {
            return new ErrorResultSet(str, false, QueryError.allOf(list2));
        });
    }
}
