package io.streamthoughts.azkarra.api.query;

import io.streamthoughts.azkarra.api.errors.AzkarraException;
import io.streamthoughts.azkarra.api.errors.AzkarraRetriableException;
import io.streamthoughts.azkarra.api.errors.Error;
import io.streamthoughts.azkarra.api.errors.InvalidStreamsStateException;
import io.streamthoughts.azkarra.api.model.KV;
import io.streamthoughts.azkarra.api.monad.Either;
import io.streamthoughts.azkarra.api.monad.Retry;
import io.streamthoughts.azkarra.api.monad.Try;
import io.streamthoughts.azkarra.api.query.internal.PreparedQuery;
import io.streamthoughts.azkarra.api.query.result.ErrorResultSet;
import io.streamthoughts.azkarra.api.query.result.QueryError;
import io.streamthoughts.azkarra.api.query.result.QueryResult;
import io.streamthoughts.azkarra.api.query.result.QueryResultBuilder;
import io.streamthoughts.azkarra.api.query.result.QueryStatus;
import io.streamthoughts.azkarra.api.query.result.SuccessResultSet;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.ServerHostInfo;
import io.streamthoughts.azkarra.api.time.Time;
import io.streamthoughts.azkarra.api.util.FutureCollectors;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/query/DistributedQuery.class */
public class DistributedQuery<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedQuery.class);
    private final RemoteQueryClient remoteQueryClient;
    private final PreparedQuery<K, V> query;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/query/DistributedQuery$LocalQueryContext.class */
    public class LocalQueryContext implements QueryContext<K, V> {
        private final KafkaStreamsContainer streams;
        private final Queried queried;

        LocalQueryContext(KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
            this.streams = kafkaStreamsContainer;
            this.queried = queried;
        }

        @Override // io.streamthoughts.azkarra.api.query.DistributedQuery.QueryContext
        public QueryResult<K, V> execute(ServerHostInfo serverHostInfo, boolean z) {
            Try<List<KV<K, V>>> execute = DistributedQuery.this.query.execute(this.streams, this.queried.limit().longValue());
            if (z && execute.isFailure()) {
                Throwable throwable = execute.getThrowable();
                if (throwable instanceof InvalidStateStoreException) {
                    throw new AzkarraRetriableException(throwable);
                }
            }
            Try<U> transform = execute.transform(list -> {
                return Try.success(Either.left(list));
            }, th -> {
                return Try.success(Either.right(Collections.singletonList(new Error(th))));
            });
            String applicationServer = this.streams.applicationServer();
            return DistributedQuery.this.buildQueryResult(this.streams.applicationServer(), Collections.singletonList(((Either) transform.get()).left().map(list2 -> {
                return new SuccessResultSet(applicationServer, false, list2);
            }).right().map(list3 -> {
                return new ErrorResultSet(applicationServer, false, QueryError.allOf(list3));
            })));
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/query/DistributedQuery$QueryContext.class */
    private interface QueryContext<K, V> {
        default QueryResult<K, V> execute(ServerHostInfo serverHostInfo) throws AzkarraException {
            return execute(serverHostInfo, false);
        }

        QueryResult<K, V> execute(ServerHostInfo serverHostInfo, boolean z) throws AzkarraException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/query/DistributedQuery$RemoteQueryContext.class */
    public class RemoteQueryContext implements QueryContext<K, V> {
        private final Queried options;
        private final String localServerName;

        RemoteQueryContext(String str, Queried queried) {
            this.localServerName = str;
            this.options = queried;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<QueryResult<K, V>> executeAsyncQueryRemotely(ServerHostInfo serverHostInfo, boolean z) {
            CompletableFuture<QueryResult<K, V>> query = DistributedQuery.this.remoteQueryClient.query(serverHostInfo, DistributedQuery.this.query, this.options);
            if (!z) {
                query = query.exceptionally(th -> {
                    return DistributedQuery.this.buildInternalErrorResult(this.localServerName, serverHostInfo.hostAndPort(), th);
                });
            }
            return (CompletableFuture<QueryResult<K, V>>) query.thenApply(queryResult -> {
                return queryResult.server(this.localServerName);
            });
        }

        @Override // io.streamthoughts.azkarra.api.query.DistributedQuery.QueryContext
        public QueryResult<K, V> execute(ServerHostInfo serverHostInfo, boolean z) {
            QueryResult<K, V> buildInternalErrorResult;
            String hostAndPort = serverHostInfo.hostAndPort();
            try {
                buildInternalErrorResult = executeAsyncQueryRemotely(serverHostInfo, z).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                buildInternalErrorResult = DistributedQuery.this.buildInternalErrorResult(this.localServerName, hostAndPort, e);
            } catch (ExecutionException e2) {
                DistributedQuery.LOG.error("Cannot query remote state store. {}", e2.getCause().getMessage());
                throw ((RuntimeException) e2.getCause());
            }
            return buildInternalErrorResult;
        }
    }

    public DistributedQuery(RemoteQueryClient remoteQueryClient, PreparedQuery<K, V> preparedQuery) {
        Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null");
        Objects.requireNonNull(preparedQuery, "query cannot be null");
        this.remoteQueryClient = remoteQueryClient;
        this.query = preparedQuery;
    }

    public QueryResult<K, V> query(KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
        Objects.requireNonNull(kafkaStreamsContainer, "streams cannot be null");
        Objects.requireNonNull(queried, "options cannot be null");
        long milliseconds = Time.SYSTEM.milliseconds();
        if (kafkaStreamsContainer.isRunning()) {
            return (this.query.isKeyedQuery() ? querySingleHostStateStore(kafkaStreamsContainer, queried) : queryMultiHostStateStore(kafkaStreamsContainer, queried)).took(Time.SYSTEM.milliseconds() - milliseconds);
        }
        throw new InvalidStreamsStateException("streams instance for id '" + kafkaStreamsContainer.applicationId() + "' is not running (" + kafkaStreamsContainer.state().value() + ")");
    }

    private QueryResult<K, V> queryMultiHostStateStore(KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
        LinkedList linkedList = new LinkedList();
        List list = (List) kafkaStreamsContainer.allMetadataForStore(this.query.storeName()).stream().map((v0) -> {
            return v0.hostInfo();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            String str = "no metadata available for store '" + this.query.storeName() + "'";
            LOG.warn(str);
            return buildNotAvailableResult(kafkaStreamsContainer.applicationServer(), str);
        }
        List list2 = null;
        if (queried.remoteAccessAllowed()) {
            RemoteQueryContext remoteQueryContext = new RemoteQueryContext(kafkaStreamsContainer.applicationServer(), queried.withRemoteAccessAllowed(false));
            list2 = (List) list.stream().filter(Predicate.not((v0) -> {
                return v0.isLocal();
            })).map(serverHostInfo -> {
                return remoteQueryContext.executeAsyncQueryRemotely(serverHostInfo, false);
            }).collect(Collectors.toList());
        }
        LocalQueryContext localQueryContext = new LocalQueryContext(kafkaStreamsContainer, queried);
        Optional map = list.stream().filter((v0) -> {
            return v0.isLocal();
        }).findFirst().map(serverHostInfo2 -> {
            return localQueryContext.execute(serverHostInfo2, false).getResult().unwrap().get(0);
        });
        Objects.requireNonNull(linkedList);
        map.ifPresent((v1) -> {
            r1.add(v1);
        });
        if (list2 != null) {
            linkedList.addAll(waitRemoteThenGet(list2));
        }
        return buildQueryResult(kafkaStreamsContainer.applicationServer(), linkedList);
    }

    private QueryResult<K, V> querySingleHostStateStore(KafkaStreamsContainer kafkaStreamsContainer, Queried queried) {
        Serializer<K> keySerializer = this.query.keySerializer() != null ? this.query.keySerializer() : kafkaStreamsContainer.defaultKeySerde().orElse(Serdes.String()).serializer();
        Retry ifExceptionOfType = Retry.withMaxAttempts(queried.retries()).withFixedWaitDuration(queried.retryBackoff()).stopAfterDuration(queried.queryTimeout()).ifExceptionOfType(AzkarraRetriableException.class);
        String applicationServer = kafkaStreamsContainer.applicationServer();
        Serializer<K> serializer = keySerializer;
        return (QueryResult) Try.retriable(() -> {
            return querySingleHostStateStore(kafkaStreamsContainer, serializer, queried);
        }, ifExceptionOfType).recover(th -> {
            return Try.success(buildNotAvailableResult(applicationServer, "Retries exhausted for querying state store " + this.query.storeName() + ". " + (th.getCause() != null ? th.getCause().getMessage() : th.getMessage())).timeout(true));
        }).get();
    }

    private QueryResult<K, V> querySingleHostStateStore(KafkaStreamsContainer kafkaStreamsContainer, Serializer<K> serializer, Queried queried) throws AzkarraException {
        String applicationServer = kafkaStreamsContainer.applicationServer();
        return (QueryResult) kafkaStreamsContainer.findMetadataForStoreAndKey(this.query.storeName(), this.query.key(), serializer).map(keyQueryMetadata -> {
            HostInfo activeHost = keyQueryMetadata.getActiveHost();
            ServerHostInfo serverHostInfo = new ServerHostInfo(kafkaStreamsContainer.applicationId(), activeHost.host(), activeHost.port(), kafkaStreamsContainer.isSameHost(activeHost));
            return (serverHostInfo.isLocal() ? new LocalQueryContext(kafkaStreamsContainer, queried) : queried.remoteAccessAllowed() ? new RemoteQueryContext(applicationServer, queried) : (serverHostInfo2, z) -> {
                return buildQueryResult(applicationServer, Collections.emptyList());
            }).execute(serverHostInfo, true);
        }).orElseGet(() -> {
            return buildNotAvailableResult(applicationServer, "no metadata available for store '" + this.query.storeName() + "', key '" + this.query.key() + "'");
        });
    }

    private QueryResult<K, V> buildNotAvailableResult(String str, String str2) {
        return QueryResultBuilder.newBuilder().setServer(str).setStatus(QueryStatus.NOT_AVAILABLE).setError(str2).build();
    }

    private QueryResult<K, V> buildQueryResult(String str, List<Either<SuccessResultSet<K, V>, ErrorResultSet>> list) {
        List<ErrorResultSet> list2 = (List) list.stream().filter((v0) -> {
            return v0.isRight();
        }).map(either -> {
            return (ErrorResultSet) either.right().get();
        }).collect(Collectors.toList());
        List<SuccessResultSet<K, V>> list3 = (List) list.stream().filter((v0) -> {
            return v0.isLeft();
        }).map(either2 -> {
            return (SuccessResultSet) either2.left().get();
        }).collect(Collectors.toList());
        return QueryResultBuilder.newBuilder().setServer(str).setStatus(computeStatus(list2, list3)).setFailedResultSet(list2).setSuccessResultSet(list3).build();
    }

    private QueryResult<K, V> buildInternalErrorResult(String str, String str2, Throwable th) {
        return QueryResultBuilder.newBuilder().setServer(str).setStatus(QueryStatus.ERROR).setFailedResultSet(new ErrorResultSet(str2, true, QueryError.of(th))).build();
    }

    private QueryStatus computeStatus(List<ErrorResultSet> list, List<SuccessResultSet<K, V>> list2) {
        return (list.isEmpty() || list2.isEmpty()) ? !list.isEmpty() ? QueryStatus.ERROR : !list2.isEmpty() ? QueryStatus.SUCCESS : QueryStatus.NO_RESULT : QueryStatus.PARTIAL;
    }

    private static <K, V> List<Either<SuccessResultSet<K, V>, ErrorResultSet>> waitRemoteThenGet(List<CompletableFuture<QueryResult<K, V>>> list) {
        try {
            return (List) ((CompletableFuture) list.stream().collect(FutureCollectors.allOf())).handle((list2, th) -> {
                if (list2 != null) {
                    return (List) list2.stream().map((v0) -> {
                        return v0.getResult();
                    }).flatMap(globalResultSet -> {
                        return globalResultSet.unwrap().stream();
                    }).collect(Collectors.toList());
                }
                LOG.error("This exception should not have happened", th);
                return Collections.emptyList();
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Unexpected error happens while waiting for remote query results", e);
            return Collections.emptyList();
        }
    }
}
