package io.strimzi.kafka.quotas;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.strimzi.kafka.quotas.VolumeUsageResult;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/quotas/VolumeSource.class */
public class VolumeSource implements Runnable {
    private final VolumeObserver volumeObserver;
    private final Admin admin;
    private final int timeout;
    private final TimeUnit timeoutUnit;
    private final LinkedHashMap<String, String> defaultTags;
    private final AtomicLong activeBrokerCount = new AtomicLong(0);
    private final AtomicLong activeLogDirsCount = new AtomicLong(0);
    private final Map<String, Map<String, AtomicLong>> consumedBytesGauges = new ConcurrentHashMap();
    private final Map<String, Map<String, AtomicLong>> availableBytesGauges = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(VolumeSource.class);

    /* loaded from: input_file:io/strimzi/kafka/quotas/VolumeSource$AtomicLongGauge.class */
    private static class AtomicLongGauge extends Gauge<Long> {
        private final AtomicLong atomicLong;

        private AtomicLongGauge(AtomicLong atomicLong) {
            this.atomicLong = atomicLong;
        }

        /* renamed from: value, reason: merged with bridge method [inline-methods] */
        public Long m4value() {
            return Long.valueOf(this.atomicLong.get());
        }
    }

    /* loaded from: input_file:io/strimzi/kafka/quotas/VolumeSource$Result.class */
    public static class Result<T> {
        private final T value;
        private final Class<? extends Throwable> throwable;

        public Result(T t, Class<? extends Throwable> cls) {
            if (Objects.nonNull(t) && Objects.nonNull(cls)) {
                throw new IllegalArgumentException("An operation can have a result or an error but not both.");
            }
            this.value = t;
            this.throwable = cls;
        }

        public T getValue() {
            return this.value;
        }

        public Class<? extends Throwable> getThrowable() {
            return this.throwable;
        }

        public boolean isFailure() {
            return this.throwable != null;
        }
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "Injecting the dependency is the right move as it can be shared")
    public VolumeSource(Admin admin, VolumeObserver volumeObserver, int i, TimeUnit timeUnit, LinkedHashMap<String, String> linkedHashMap) {
        this.volumeObserver = volumeObserver;
        this.admin = admin;
        this.timeout = i;
        this.timeoutUnit = timeUnit;
        this.defaultTags = linkedHashMap;
        Metrics.newGauge(StaticQuotaCallback.metricName("ActiveBrokers", "VolumeSource", "io.strimzi.kafka.quotas", linkedHashMap), new AtomicLongGauge(this.activeBrokerCount));
        Metrics.newGauge(StaticQuotaCallback.metricName("ActiveLogDirs", "VolumeSource", "io.strimzi.kafka.quotas", linkedHashMap), new AtomicLongGauge(this.activeLogDirsCount));
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            log.info("Updating cluster volume usage.");
            notifyObserver((VolumeUsageResult) toResultStage(this.admin.describeCluster().nodes()).thenCompose(this::onDescribeClusterComplete).toCompletableFuture().get(this.timeout, this.timeoutUnit));
            log.info("Updated cluster volume usage.");
        } catch (InterruptedException e) {
            notifyObserver(VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.INTERRUPTED, e.getClass()));
            log.warn("Caught interrupt exception trying to describe cluster and logDirs: {}", e.getMessage(), e);
            Thread.currentThread().interrupt();
        } catch (RuntimeException e2) {
            notifyObserver(VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.EXCEPTION, e2.getClass()));
            log.warn("Caught runtime exception trying to describe cluster and logDirs: {}", e2.getMessage(), e2);
        } catch (ExecutionException e3) {
            notifyObserver(VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.EXECUTION_EXCEPTION, e3.getClass()));
            log.warn("Caught execution exception trying to describe cluster and logDirs: {}", e3.getMessage(), e3);
        } catch (TimeoutException e4) {
            notifyObserver(VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.SAFETY_TIMEOUT, e4.getClass()));
            log.warn("Caught timeout exception trying to describe cluster and logDirs: {}", e4.getMessage(), e4);
        }
    }

    private <T> CompletionStage<Result<T>> toResultStage(KafkaFuture<T> kafkaFuture) {
        return kafkaFuture.toCompletionStage().thenApply(obj -> {
            return new Result(obj, null);
        }).exceptionally(th -> {
            log.debug("Creating failed result for call.", th);
            return new Result(null, th.getClass());
        });
    }

    private void notifyObserver(VolumeUsageResult volumeUsageResult) {
        if (log.isDebugEnabled()) {
            log.debug("Notifying consumers of volumes usage result: {}", volumeUsageResult);
        }
        this.volumeObserver.observeVolumeUsage(volumeUsageResult);
    }

    private CompletionStage<VolumeUsageResult> onDescribeClusterComplete(Result<Collection<Node>> result) {
        return result.isFailure() ? CompletableFuture.completedFuture(VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.DESCRIBE_CLUSTER_ERROR, result.getThrowable())) : onDescribeClusterSuccess(result.getValue());
    }

    private CompletionStage<VolumeUsageResult> onDescribeClusterSuccess(Collection<Node> collection) {
        Set set = (Set) collection.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        this.activeBrokerCount.set(set.size());
        set.forEach(num -> {
            this.availableBytesGauges.computeIfAbsent(String.valueOf(num), str -> {
                return new ConcurrentHashMap();
            });
            this.consumedBytesGauges.computeIfAbsent(String.valueOf(num), str2 -> {
                return new ConcurrentHashMap();
            });
        });
        log.debug("Attempting to describe logDirs");
        return toResultStage(this.admin.describeLogDirs(set).allDescriptions()).thenApply(this::onDescribeLogDirComplete);
    }

    private VolumeUsageResult onDescribeLogDirComplete(Result<Map<Integer, Map<String, LogDirDescription>>> result) {
        return result.isFailure() ? VolumeUsageResult.failure(VolumeUsageResult.VolumeSourceObservationStatus.DESCRIBE_LOG_DIR_ERROR, result.getThrowable()) : onDescribeLogDirSuccess(((Result) result).value);
    }

    private VolumeUsageResult onDescribeLogDirSuccess(Map<Integer, Map<String, LogDirDescription>> map) {
        if (log.isDebugEnabled()) {
            log.debug("Successfully described logDirs: {}", map);
        }
        List list = (List) map.entrySet().stream().flatMap(VolumeSource::toVolumes).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toUnmodifiableList());
        this.activeLogDirsCount.set(list.size());
        list.forEach(volumeUsage -> {
            this.availableBytesGauges.get(volumeUsage.getBrokerId()).computeIfAbsent(volumeUsage.getLogDir(), buildCounter(volumeUsage, "AvailableBytes")).set(volumeUsage.getAvailableBytes());
            this.consumedBytesGauges.get(volumeUsage.getBrokerId()).computeIfAbsent(volumeUsage.getLogDir(), buildCounter(volumeUsage, "ConsumedBytes")).set(volumeUsage.getConsumedSpace());
        });
        return VolumeUsageResult.success(list);
    }

    private Function<String, AtomicLong> buildCounter(VolumeUsage volumeUsage, String str) {
        return str2 -> {
            AtomicLong atomicLong = new AtomicLong(0L);
            Metrics.newGauge(StaticQuotaCallback.metricName((Class<?>) VolumeSource.class, str, buildTagMap(volumeUsage)), new AtomicLongGauge(atomicLong));
            return atomicLong;
        };
    }

    private LinkedHashMap<String, String> buildTagMap(VolumeUsage volumeUsage) {
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>(this.defaultTags);
        linkedHashMap.put(StaticQuotaCallback.REMOTE_BROKER_TAG, volumeUsage.getBrokerId());
        linkedHashMap.put(StaticQuotaCallback.LOG_DIR_TAG, volumeUsage.getLogDir());
        return linkedHashMap;
    }

    private static Stream<? extends VolumeUsage> toVolumes(Map.Entry<Integer, Map<String, LogDirDescription>> entry) {
        return entry.getValue().entrySet().stream().map(entry2 -> {
            LogDirDescription logDirDescription = (LogDirDescription) entry2.getValue();
            if (!logDirDescription.totalBytes().isPresent() || !logDirDescription.usableBytes().isPresent()) {
                return null;
            }
            return new VolumeUsage(String.valueOf(entry.getKey()), (String) entry2.getKey(), logDirDescription.totalBytes().getAsLong(), logDirDescription.usableBytes().getAsLong(), Instant.now());
        });
    }
}
